常州免费网站建设/怎么建网址
因开发需要,我们大数据组,基于Apache Common-Pool,对 InfluxDB的封装了一个连接池,现就主要逻辑涉及到的源代码(与Apache Common-Pool交叉)进行梳理。
我们将从以下四个方面进行梳理:
- 1、连接池初始化的操作:
- 2、出借连接对象时的逻辑。
- 3、归还连接对象时的逻辑。
- 4、主要的异常信息 列举(与Apache Common-Pool交叉),
1、连接池初始化的操作。
创建连接池对象时,由代码可见:
就是把各种配置信息初始化到连接池对象中,但是并不创建任何连接对象。
只是在获取连接的时候,才会做判断。
/*** Create a new <code>GenericObjectPool</code> using a specific* configuration.** @param factory The object factory to be used to create object instances* used by this pool* @param config The configuration to use for this pool instance. The* configuration is used by value. Subsequent changes to* the configuration object will not be reflected in the* pool.*/public GenericObjectPool(final PooledObjectFactory<T> factory,final GenericObjectPoolConfig config) {super(config, ONAME_BASE, config.getJmxNamePrefix());if (factory == null) {jmxUnregister(); // tidy upthrow new IllegalArgumentException("factory may not be null");}this.factory = factory;idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());setConfig(config);startEvictor(getTimeBetweenEvictionRunsMillis());}
2、出借连接对象时的逻辑。
2.1 出借连接对象,主要是 下面 getResource 方法。
public T getResource() {try {return internalPool.borrowObject(); //获取连接对象,} catch (NoSuchElementException nse) {if (null == nse.getCause()) { // The exception was caused by an exhausted poolthrow new PydInfluxDBExhaustedPoolException("Could not get a resource since the pool is exhausted", nse);}// Otherwise, the exception was caused by the implemented activateObject() or ValidateObject()throw new PydInfluxDBException("Could not get a resource from the pool", nse);} catch (Exception e) {throw new PydInfluxDBConnectionException("Could not get a resource from the pool", e);}}
2.2 然后再调用borrowObject这个方法
出借连接对象的逻辑,主要都在下面GenericObjectPool 类的这个方法中(见代码行注释):
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {assertOpen();final AbandonedConfig ac = this.abandonedConfig;if (ac != null && ac.getRemoveAbandonedOnBorrow() &&(getNumIdle() < 2) &&(getNumActive() > getMaxTotal() - 3) ) {removeAbandoned(ac);}PooledObject<T> p = null;// Get local copy of current config so it is consistent for entire// method executionfinal boolean blockWhenExhausted = getBlockWhenExhausted();boolean create;final long waitTime = System.currentTimeMillis();while (p == null) {create = false;p = idleObjects.pollFirst(); // 1、先 从 队列中获取连接对象if (p == null) { p = create(); // 2、如果没有从队列中获取到,则创建对象if (p != null) {create = true;}}if (blockWhenExhausted) { // 3、如果无法创建(可能连接被耗尽),则要进入耗尽等待阻塞状态。if (p == null) {if (borrowMaxWaitMillis < 0) {p = idleObjects.takeFirst();} else {p = idleObjects.pollFirst(borrowMaxWaitMillis,TimeUnit.MILLISECONDS); // 4、基于等待时间 等待。}}if (p == null) {throw new NoSuchElementException("Timeout waiting for idle object");}} else {if (p == null) {throw new NoSuchElementException("Pool exhausted");}}if (!p.allocate()) { //5、 获取成功后,将连接对象 从 Idle 状态,修改为 allocate 状态。p = null;}if (p != null) {try {factory.activateObject(p); // 6、获取成功后,可以 开始做激活操作} catch (final Exception e) {try {destroy(p);} catch (final Exception e1) {// Ignore - activation failure is more important}p = null;if (create) {final NoSuchElementException nsee = new NoSuchElementException("Unable to activate object");nsee.initCause(e);throw nsee;}}if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {boolean validate = false;Throwable validationThrowable = null;try {validate = factory.validateObject(p); // 7、获取成功后,可以 开始做 测试校验 操作} catch (final Throwable t) {PoolUtils.checkRethrow(t);validationThrowable = t;}if (!validate) {try {destroy(p);destroyedByBorrowValidationCount.incrementAndGet();} catch (final Exception e) {// Ignore - validation failure is more important}p = null;if (create) {final NoSuchElementException nsee = new NoSuchElementException("Unable to validate object");nsee.initCause(validationThrowable);throw nsee;}}}}}updateStatsBorrow(p, System.currentTimeMillis() - waitTime);return p.getObject(); // 8、最终 返回 要 借出的 连接对象。}
3、有关借出连接的几个细节
3.1首先会调用工厂类,创建连接对象。
org.apache.commons.pool2.impl.GenericObjectPooltry {p = factory.makeObject();} catch (final Exception e) {createCount.decrementAndGet();throw e;
} finally {
3.2 自定义实现了接口的工厂类,创建了连接对象,会被装包到默认池对象类中。
PydInfluxDBFactory@Overridepublic PooledObject<PydInfluxDB> makeObject() throws Exception {PydInfluxDB pydInfluxDB =new PydInfluxDBImpl(this.url, this.username, this.password, new OkHttpClient.Builder());//可以在此处做一些判断、校验类的操作。return new DefaultPooledObject<>(pydInfluxDB);}
3.3 默认的池对象,会缺省设置几个描述连接对象状态的属性值。
public class DefaultPooledObject<T> implements PooledObject<T> {private final T object;private PooledObjectState state = PooledObjectState.IDLE; // @GuardedBy("this") to ensure transitions are validprivate final long createTime = System.currentTimeMillis();private volatile long lastBorrowTime = createTime;private volatile long lastUseTime = createTime;private volatile long lastReturnTime = createTime;private volatile boolean logAbandoned = false;private final CallStack borrowedBy = CallStackUtils.newCallStack("'Pooled object created' " +"yyyy-MM-dd HH:mm:ss Z 'by the following code has not been returned to the pool:'", true);private final CallStack usedBy = CallStackUtils.newCallStack("The last code to use this object was:",false);private volatile long borrowedCount = 0;/*** Create a new instance that wraps the provided object so that the pool can* track the state of the pooled object.** @param object The object to wrap*/public DefaultPooledObject(final T object) {this.object = object;}
重点 如 private PooledObjectState state = PooledObjectState.IDLE;
这一句,池对象的状态。是一个枚举类,有多个对象的状态取值。重点的是下面这两个。
public enum PooledObjectState {/*** In the queue, not in use.*/IDLE, // 此种状态的连接对象才会被借出。且 借出后修改状态为ALLOCATED。/*** In use.*/ALLOCATED, // 此种状态的连接对象才会被归还。且 归还后修改状态为 IDLE 。
}
3.4 借出连接时,会修改的标识信息。
在 章节1 中, 主要逻辑代码中,有如下 代码语句:p.allocate() 方法的内容和逻辑如下:
首先,判断连接对象状态是否为Idle;
然后,会修改 其标识信息为Alllcate
/*** Allocates the object.** @return {@code true} if the original state was {@link PooledObjectState#IDLE IDLE}*/@Overridepublic synchronized boolean allocate() {if (state == PooledObjectState.IDLE) {state = PooledObjectState.ALLOCATED;lastBorrowTime = System.currentTimeMillis();lastUseTime = lastBorrowTime;borrowedCount++;if (logAbandoned) {borrowedBy.fillInStackTrace();}return true;} else if (state == PooledObjectState.EVICTION) {// TODO Allocate anyway and ignore eviction teststate = PooledObjectState.EVICTION_RETURN_TO_HEAD;return false;}// TODO if validating and testOnBorrow == true then pre-allocate for// performancereturn false;}
3.5 分析:当借连接的等待时间 超时,或者 未设置时,执行的逻辑。
if (blockWhenExhausted) { //连接池资源耗尽时,会等待、阻塞。if (p == null) {if (borrowMaxWaitMillis < 0) {p = idleObjects.takeFirst(); // 不设置等待超时,则直接获取} else { // 设置了等待超时,会进行判断。p = idleObjects.pollFirst(borrowMaxWaitMillis,TimeUnit.MILLISECONDS);}}if (p == null) { // 等待超时后,仍旧没有获取连接,则会抛出下面的异常信息。throw new NoSuchElementException("Timeout waiting for idle object");}} else {if (p == null) { throw new NoSuchElementException("Pool exhausted");}}
4、归还连接对象时的逻辑。
有关 归还连接的 逻辑代码, 都在主要都在下面GenericObjectPool 类 的这个方法中:
详见 代码 行注释。
@Overridepublic void returnObject(final T obj) {final PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));if (p == null) {if (!isAbandonedConfig()) {throw new IllegalStateException("Returned object not currently part of this pool");}return; // Object was abandoned and removed}synchronized(p) {final PooledObjectState state = p.getState();if (state != PooledObjectState.ALLOCATED) {throw new IllegalStateException("Object has already been returned to this pool or is invalid");}p.markReturning(); // Keep from being marked abandoned // 1、此句代码修改了连接的状态,下面会进一步分析内部代码。}final long activeTime = p.getActiveTimeMillis();if (getTestOnReturn()) { // 2、归还时,可以先测试一下。需要配置。if (!factory.validateObject(p)) {try {destroy(p);} catch (final Exception e) {swallowException(e);}try {ensureIdle(1, false);} catch (final Exception e) {swallowException(e);}updateStatsReturn(activeTime);return;}}try {factory.passivateObject(p);} catch (final Exception e1) {swallowException(e1);try {destroy(p);} catch (final Exception e) {swallowException(e);}try {ensureIdle(1, false);} catch (final Exception e) {swallowException(e);}updateStatsReturn(activeTime);return;}if (!p.deallocate()) { // 3、此句代码修改了连接的状态,下面会进一步分析内部代码。throw new IllegalStateException("Object has already been returned to this pool or is invalid");}final int maxIdleSave = getMaxIdle();if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {try {destroy(p);} catch (final Exception e) {swallowException(e);}} else {if (getLifo()) {idleObjects.addFirst(p);} else {idleObjects.addLast(p);}if (isClosed()) {// Pool closed while object was being added to idle objects.// Make sure the returned object is destroyed rather than left// in the idle object pool (which would effectively be a leak)clear();}}updateStatsReturn(activeTime);}
5、有关归还连接的细节
在上面的“归还连接”代码中,有一句:p.deallocate() ,这是关键代码,会更新连接的状态:
/*** Deallocates the object and sets it {@link PooledObjectState#IDLE IDLE}* if it is currently {@link PooledObjectState#ALLOCATED ALLOCATED}.** @return {@code true} if the state was {@link PooledObjectState#ALLOCATED ALLOCATED}*/@Overridepublic synchronized boolean deallocate() {if (state == PooledObjectState.ALLOCATED || state == PooledObjectState.RETURNING) {state = PooledObjectState.IDLE; // 此句将连接的状态,由ALLOCATED修改为IDLE。lastReturnTime = System.currentTimeMillis();borrowedBy.clear();return true;}return false;}
6、主要的异常信息 列举
下面列举出,几处重要的异常信息,便于在日志中搜索。
6.1 借出连接,设置激活失败时,报出的异常信息。
try {factory.activateObject(p);} catch (final Exception e) {try {destroy(p);} catch (final Exception e1) {// Ignore - activation failure is more important}p = null;if (create) {final NoSuchElementException nsee = new NoSuchElementException("Unable to activate object");nsee.initCause(e);throw nsee;}}
6.2 借出连接,进行校验操作失败时,报出的异常信息。
if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {boolean validate = false;Throwable validationThrowable = null;try {validate = factory.validateObject(p);} catch (final Throwable t) {PoolUtils.checkRethrow(t);validationThrowable = t;}if (!validate) {try {destroy(p);destroyedByBorrowValidationCount.incrementAndGet();} catch (final Exception e) {// Ignore - validation failure is more important}p = null;if (create) {final NoSuchElementException nsee = new NoSuchElementException("Unable to validate object");nsee.initCause(validationThrowable);throw nsee;}}}
6.3 获取资源失败时,可能会报出的异常信息。
public T getResource() {try {return internalPool.borrowObject();} catch (NoSuchElementException nse) {if (null == nse.getCause()) { // The exception was caused by an exhausted poolthrow new PydInfluxDBExhaustedPoolException("Could not get a resource since the pool is exhausted", nse);}// Otherwise, the exception was caused by the implemented activateObject() or ValidateObject()throw new PydInfluxDBException("Could not get a resource from the pool", nse);} catch (Exception e) {throw new PydInfluxDBConnectionException("Could not get a resource from the pool", e);}}
6.4 当借连接的等待时间 超时,或者 未设置时,执行的逻辑。
if (blockWhenExhausted) {if (p == null) {if (borrowMaxWaitMillis < 0) {p = idleObjects.takeFirst();} else {p = idleObjects.pollFirst(borrowMaxWaitMillis,TimeUnit.MILLISECONDS);}}if (p == null) {throw new NoSuchElementException("Timeout waiting for idle object");}} else {if (p == null) {throw new NoSuchElementException("Pool exhausted");}}
以上就是 对于InfluxDBPool 开发代码 重要逻辑的梳理,大部分的代码都是 Apache Common-Pool 中的源码,大家可以参见。
谢谢!