Mybatis- 数据源模块

Mybatis- 数据源模块

本篇学习Mybatis的数据源模块

org.apache.ibatis.datasource


数据源模块要做什么

数据源模块要实现的功能:


首先我们创建 dataSource的过程是非常复杂的。而且我们有需要方便扩展.因此我们可以使用工厂模式(创建型的模式)

工厂模式可以查看我之前写的一篇 https://mp.toutiao.com/profile_v4/graphic/preview?pgc_id=7008076815566717447


整个包结构

Mybatis- 数据源模块

从这个包结构也可以看出来 数据源模块整体的设计是一个工厂模式.

源码解析

先来看下抽象工厂类:

public interface DataSourceFactory {
    
  // 设置dataSource属性
  void setProperties(Properties props);
  // 获取dataSource
  DataSource getDataSource();
    
}

非连接池包

非连接池包里面一共就2个类 UnpooledDataSourceFactory UnpooledDataSource

UnpooledDataSourceFactory 是实现了 抽象工厂DataSourceFactory的类.

UnpooledDataSource则是上面工厂类实际的调用的类

我们先看下UnpooledDataSource

public class UnpooledDataSource implements DataSource {

  // 数据库驱动
  private ClassLoader driverClassLoader;
  // 数据库连接相关配置信息
  private Properties driverProperties;
  // 驱动的注册中心
  private static Map registeredDrivers = new ConcurrentHashMap<>();

  // 数据库连接的一些属性
  private String driver;
  private String url;
  private String username;
  private String password;

  // 是否自动提交事务
  private Boolean autoCommit;
  // 默认事务级别
  private Integer defaultTransactionIsolationLevel;
  // 默认超时时间
  private Integer defaultNetworkTimeout;

  // 使用静态代码快将驱动注册到DriverManage
  // 这个地方和Mysql Driver里面的原理大致一致
  static {
    Enumeration drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }

  // 一大波构造函数和setter
  ...

  // 获取conn通过用户名和密码
  private Connection doGetConnection(String username, String password) throws SQLException {
    Properties props = new Properties();
    if (driverProperties != null) {
      props.putAll(driverProperties);
    }
    if (username != null) {
      props.setProperty("user", username);
    }
    if (password != null) {
      props.setProperty("password", password);
    }
    return doGetConnection(props);
  }

  // 获取conn通过配置文件
  private Connection doGetConnection(Properties properties) throws SQLException {
    initializeDriver();
    Connection connection = DriverManager.getConnection(url, properties);
    // 设置事务是否自动提交,事务的隔离级别
    configureConnection(connection);
    return connection;
  }

  private synchronized void initializeDriver() throws SQLException {
    if (!registeredDrivers.containsKey(driver)) {
      Class<?> driverType;
      try {
        if (driverClassLoader != null) {
          driverType = Class.forName(driver, true, driverClassLoader);
        } else {
          driverType = Resources.classForName(driver);
        }
        // DriverManager requires the driver to be loaded via the system ClassLoader.
        // http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
        Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance();
        DriverManager.registerDriver(new DriverProxy(driverInstance));
        registeredDrivers.put(driver, driverInstance);
      } catch (Exception e) {
        throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
      }
    }
  }

  // 配置超时时间、自动提交、事务
  private void configureConnection(Connection conn) throws SQLException {
    if (defaultNetworkTimeout != null) {
      conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
    }
    if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
      conn.setAutoCommit(autoCommit);
    }
    if (defaultTransactionIsolationLevel != null) {
      conn.setTransactionIsolation(defaultTransactionIsolationLevel);
    }
  }

  ...
}

我们在继续看下工厂类UnpooledDataSourceFactory

/**
 * @author Clinton Begin
 *  子类工厂对应的产品为  UnpooledDataSource
 */
public class UnpooledDataSourceFactory implements DataSourceFactory {

  private static final String DRIVER_PROPERTY_PREFIX = "driver.";
  private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();

  protected DataSource dataSource;

  public UnpooledDataSourceFactory() {
    this.dataSource = new UnpooledDataSource();
  }

  @Override
  public void setProperties(Properties properties) {
    Properties driverProperties = new Properties();
    // 创建DataSource相应的metaObject,方便赋值
    MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
    // 遍历properties,将属性设置到DataSource中
    for (Object key : properties.keySet()) {
      String propertyName = (String) key;
      if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
        String value = properties.getProperty(propertyName);
        driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
      } else if (metaDataSource.hasSetter(propertyName)) {
        String value = (String) properties.get(propertyName);
        Object convertedValue = convertValue(metaDataSource, propertyName, value);
        metaDataSource.setValue(propertyName, convertedValue);
      } else {
        throw new DataSourceException("Unknown DataSource property: " + propertyName);
      }
    }
    // 设置DataSource.driverProperties属性
    if (driverProperties.size() > 0) {
      metaDataSource.setValue("driverProperties", driverProperties);
    }
  }

  @Override
  public DataSource getDataSource() {
    return dataSource;
  }
    ...
}    

连接池包

连接池包中类多了几个

直接贴源码分析:

PooledDataSourceFactory

// 继承了 UnpooledDataSourceFactory  UnpooledDataSourceFactory实现了最基本的抽象工厂接口
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }
}

PoolState

/**
 * @author Clinton Begin
 *   PoolState:用于管理PooledConnection对象状态的组件,
 *   通过两个list分别,管理空闲状态的连接资源和活跃状态的连接资源
 *   并且提供了一些属性来记录时间次数等
 */
public class PoolState {

  protected PooledDataSource dataSource;

  // 空闲的连接池资源集合
  protected final List idleConnections = new ArrayList<>();
  // 活跃的连接池资源集合
  protected final List activeConnections = new ArrayList<>();
  // 请求的次数
  protected long requestCount = 0;
  // 累计的获得连接的时间
  protected long accumulatedRequestTime = 0;
  // 累计的使用连接的时间。从连接取出到归还,算一次使用的时间;
  protected long accumulatedCheckoutTime = 0;
  // 连接超时的次数
  protected long claimedOverdueConnectionCount = 0;
  // 累计超时时间
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  // 累计等待时间
  protected long accumulatedWaitTime = 0;
  // 等待次数
  protected long hadToWaitCount = 0;
  // 连接失败次数
  protected long badConnectionCount = 0;

  ...

}

PooledConnection

/**
 * @author Clinton Begin
 *   数据库连接对象 - 采用了动态代理的方式
 */
class PooledConnection implements InvocationHandler {

  private static final String CLOSE = "close";
  private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

  private final int hashCode;
  /*
    记录当前连接所在的数据源对象,
    本次连接是由这个数据源创建的,关闭后也是回到这个数据源;
    可以理解为这个对象是提供给客户端使用的
   */
  private final PooledDataSource dataSource;
  // 真实的连接对象
  private final Connection realConnection;
  // 代理连接对象
  private final Connection proxyConnection;
  // 连接时间戳 -  连接上数据源的时间
  private long checkoutTimestamp;
  // 创建时间戳 - 创建连接的时间
  private long createdTimestamp;
  // 最后一次使用时间戳
  private long lastUsedTimestamp;
  // 这个属性标志唯一连接池, 由数据库url、用户名、密码生成一个hash值
  private int connectionTypeCode;
  // 连接是否还有效
  private boolean valid;

  /**
   * Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in.
   *
   * @param connection
   *          - the connection that is to be presented as a pooled connection
   * @param dataSource
   *          - the dataSource that the connection is from
   */
  public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    // 这里使用了动态代理来设置代理对象,这个代理对象最后在获取连接的时候会返回给调用Mybatis的一方
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }
    ...
}

PooledDataSource 这个类是核心类了,获取和归还连接都是由这个类来完成的,我这边也仅对几个核心的方法进行分析

public class PooledDataSource implements DataSource {

  private static final Log log = LogFactory.getLog(PooledDataSource.class);

  private final PoolState state = new PoolState(this);

  // 数据源
  private final UnpooledDataSource dataSource;

  // OPTIONAL CONFIGURATION FIELDS
  // 最大连接数
  protected int poolMaximumActiveConnections = 10;
  // 最大闲置连接数
  protected int poolMaximumIdleConnections = 5;
  // 最大的校验时间
  protected int poolMaximumCheckoutTime = 20000;
  // 最长等待时间
  protected int poolTimeToWait = 20000;
  // 最多几次允许几次无效连接
  protected int poolMaximumLocalBadConnectionTolerance = 3;
  // 测试连接是否有效的sql语句
  protected String poolPingQuery = "NO PING QUERY SET";
  // 是否允许测试连接
  protected boolean poolPingEnabled;
  // 连接在这个配置时间内没有被使用,才允许测试连接是否有效
  protected int poolPingConnectionsNotUsedFor;
  // 连接池的唯一标志: 根据数据库url、用户名、密码生成一个hash值
  private int expectedConnectionTypeCode;

  /**
   * 归还连接
   * @param conn
   * @throws SQLException
   */
  protected void pushConnection(PooledConnection conn) throws SQLException {
    // 同样归还连接的时候也要加锁
    synchronized (state) {
      // 先从活跃线程队列中移除
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        // 判断闲置连接池资源是否已经达到上限
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          // 没有达到上线就创建新的连接(这个新连接实际上就是上面的归还的连接)放入空闲线程中
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          // 将连接对象设置为无效
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }

          // 唤醒其他等待线程 - 这个很重要,
          // 在我们获取连接的时候如果实在是没有连接就会让线程处于阻塞状态
          state.notifyAll();
        } else {

          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }

          // 如果闲置连接池已经达到上限了,将连接真实关闭
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          // 将连接对象设置为无效
          conn.invalidate();
        }
      } else {
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

  /**
   *  获取数据库连接
   * @param username
   * @param password
   * @return
   * @throws SQLException
   *   整个流程分为两部分
   *    1. 获取连接
   *    2. 对获取的连接做池化数据的信息处理
   *
   *    第一部分: 循环判断获取连接是否为空
   *        优先从空闲连接中获取连接
   *            有:空闲连接就进入第二部分
   *            没有: 判断是否可以新增连接(活跃连接数是否已达到最大连接数)
   *                   若没到最大线程数则直接创建俩姐进入第二部分
   *                   若无法新增连接,则检查是否存在超时的连接,如果有就使用这个连接,没有则进入阻塞状态
   *    第二部分:
   *         修改线程的活跃线程数,请求次数等.  (PoolState)
   */
  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    // 这里是一个循环,直到拿到 conn,不然就会一致循环下去
    // 循环体中有超时和超次机制
    while (conn == null) {
      // 使用 synchronized 来处理多线程间的同步
      synchronized (state) {
        // 检查是否还有空闲线程,不为空表示有
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          // 有空闲线程则将线程从线程池的空闲队列中拿出第一个线程(这里会减少空闲线程数量)
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // 当没有空闲线程时,判断活跃线程数是否已经达到最大线程数
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection 创建新的连接
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // Cannot create new connection
            // 无法创建心对连接,则检查线程池中最早的线程是否超时.
            // 若超时则将连接释放出来
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              // 在连接池中创建新的连接,注意对于数据库来说,并没有创建新连接;
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait
              // 实在是没有连接可以用了,就阻塞等到
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }

        // 上面的循环已经拿到了 conn 这里做一个二次校验
        if (conn != null) {
          // ping to server and check the connection is valid or not
          // 检测连接是否有效
          if (conn.isValid()) {
            // 这里获取的是真实连接的自动提交属性,如果不是自动提交,就将事务回滚
            // 这么做是因为获取的连接有可能是超时连接,但是本身连接中还有事务在运行
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            // 设置连接池相关统计信息更新
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            // 如果连接无效
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            // 累计的获取无效连接次数+1
            state.badConnectionCount++;
            // 当前获取无效连接次数+1
            localBadConnectionCount++;
            conn = null;
            // 拿到无效连接,但如果没有超过重试的次数,则重新走上面的 while 循环
            // 允许再次尝试获取连接,否则抛出异常
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }
    
}

整体流程图:


Mybatis- 数据源模块

壁纸图,侵权删


Mybatis- 数据源模块

Mybatis- 数据源模块

Mybatis- 数据源模块



展开阅读全文

页面更新:2024-05-17

标签:数据源   模块   抽象   组件   数据库连接   属性   次数   对象   工厂   状态   事务   模式   数据库   时间   资源   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top