httpclient的CPool定义方法详解
序
本文主要研究一下httpclient的CPool
ConnPool
org/apache/http/pool/ConnPool.java
public interface ConnPool<T, E> { /** * Attempts to lease a connection for the given route and with the given * state from the pool. * * @param route route of the connection. * @param state arbitrary object that represents a particular state * (usually a security principal or a unique token identifying * the user whose credentials have been used while establishing the connection). * May be {@code null}. * @param callback operation completion callback. * * @return future for a leased pool entry. */ Future<E> lease(final T route, final Object state, final FutureCallback<E> callback); /** * Releases the pool entry back to the pool. * * @param entry pool entry leased from the pool * @param reusable flag indicating whether or not the released connection * is in a consistent state and is safe for further use. */ void release(E entry, boolean reusable); }
ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry
ConnPoolControl
public interface ConnPoolControl<T> { void setMaxTotal(int max); int getMaxTotal(); void setDefaultMaxPerRoute(int max); int getDefaultMaxPerRoute(); void setMaxPerRoute(final T route, int max); int getMaxPerRoute(final T route); PoolStats getTotalStats(); PoolStats getStats(final T route); }
ConnPoolControl接口定义了设置和访问maxTotal、defaultMaxPerRoute及PoolStats的方法
AbstractConnPool
org/apache/http/pool/AbstractConnPool.java
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>> implements ConnPool<T, E>, ConnPoolControl<T> { private final Lock lock; private final Condition condition; private final ConnFactory<T, C> connFactory; private final Map<T, RouteSpecificPool<T, C, E>> routeToPool; private final Set<E> leased; private final LinkedList<E> available; private final LinkedList<Future<E>> pending; private final Map<T, Integer> maxPerRoute; private volatile boolean isShutDown; private volatile int defaultMaxPerRoute; private volatile int maxTotal; private volatile int validateAfterInactivity; public AbstractConnPool( final ConnFactory<T, C> connFactory, final int defaultMaxPerRoute, final int maxTotal) { super(); this.connFactory = Args.notNull(connFactory, "Connection factory"); this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value"); this.maxTotal = Args.positive(maxTotal, "Max total value"); this.lock = new ReentrantLock(); this.condition = this.lock.newCondition(); this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>(); this.leased = new HashSet<E>(); this.available = new LinkedList<E>(); this.pending = new LinkedList<Future<E>>(); this.maxPerRoute = new HashMap<T, Integer>(); } /** * Creates a new entry for the given connection with the given route. */ protected abstract E createEntry(T route, C conn); //...... }
AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType
shutdown
public void shutdown() throws IOException { if (this.isShutDown) { return ; } this.isShutDown = true; this.lock.lock(); try { for (final E entry: this.available) { entry.close(); } for (final E entry: this.leased) { entry.close(); } for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) { pool.shutdown(); } this.routeToPool.clear(); this.leased.clear(); this.available.clear(); } finally { this.lock.unlock(); } }
shutdown方法会遍历available、leased挨个执行close,然后遍历routeToPool挨个执行shutdown
lease方法
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) { Args.notNull(route, "Route"); Asserts.check(!this.isShutDown, "Connection pool shut down"); return new Future<E>() { private final AtomicBoolean cancelled = new AtomicBoolean(false); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference<E> entryRef = new AtomicReference<E>(null); @Override public boolean cancel(final boolean mayInterruptIfRunning) { if (done.compareAndSet(false, true)) { cancelled.set(true); lock.lock(); try { condition.signalAll(); } finally { lock.unlock(); } if (callback != null) { callback.cancelled(); } return true; } return false; } @Override public boolean isCancelled() { return cancelled.get(); } @Override public boolean isDone() { return done.get(); } @Override public E get() throws InterruptedException, ExecutionException { try { return get(0L, TimeUnit.MILLISECONDS); } catch (final TimeoutException ex) { throw new ExecutionException(ex); } } @Override public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { for (;;) { synchronized (this) { try { final E entry = entryRef.get(); if (entry != null) { return entry; } if (done.get()) { throw new ExecutionException(operationAborted()); } final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this); if (validateAfterInactivity > 0) { if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) { if (!validate(leasedEntry)) { leasedEntry.close(); release(leasedEntry, false); continue; } } } if (done.compareAndSet(false, true)) { entryRef.set(leasedEntry); done.set(true); onLease(leasedEntry); if (callback != null) { callback.completed(leasedEntry); } return leasedEntry; } else { release(leasedEntry, true); throw new ExecutionException(operationAborted()); } } catch (final IOException ex) { if (done.compareAndSet(false, true)) { if (callback != null) { callback.failed(ex); } } throw new ExecutionException(ex); } } } } }; }
lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法
getPoolEntryBlocking
org/apache/http/pool/AbstractConnPool.java
private E getPoolEntryBlocking( final T route, final Object state, final long timeout, final TimeUnit timeUnit, final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException { Date deadline = null; if (timeout > 0) { deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout)); } this.lock.lock(); try { final RouteSpecificPool<T, C, E> pool = getPool(route); E entry; for (;;) { Asserts.check(!this.isShutDown, "Connection pool shut down"); if (future.isCancelled()) { throw new ExecutionException(operationAborted()); } for (;;) { entry = pool.getFree(state); if (entry == null) { break; } if (entry.isExpired(System.currentTimeMillis())) { entry.close(); } if (entry.isClosed()) { this.available.remove(entry); pool.free(entry, false); } else { break; } } if (entry != null) { this.available.remove(entry); this.leased.add(entry); onReuse(entry); return entry; } // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (int i = 0; i < excess; i++) { final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } if (pool.getAllocatedCount() < maxPerRoute) { final int totalUsed = this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity > 0) { final int totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } final C conn = this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } } boolean success = false; try { pool.queue(future); this.pending.add(future); if (deadline != null) { success = this.condition.awaitUntil(deadline); } else { this.condition.await(); success = true; } if (future.isCancelled()) { throw new ExecutionException(operationAborted()); } } finally { // In case of 'success', we were woken up by the // connection pool and should now have a connection // waiting for us, or else we're shutting down. // Just continue in the loop, both cases are checked. pool.unqueue(future); this.pending.remove(future); } // check for spurious wakeup vs. timeout if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) { break; } } throw new TimeoutException("Timeout waiting for connection"); } finally { this.lock.unlock(); } }
getPoolEntryBlocking先根据route从routeToPool取出对应的RouteSpecificPool,然后pool.getFree(state),之后判断是否过期,是否关闭,没问题则从available移除,添加到leased中,然后执行onReuse回调,如果entry为null则通过connFactory.create(route)来创建
release
@Override public void release(final E entry, final boolean reusable) { this.lock.lock(); try { if (this.leased.remove(entry)) { final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); pool.free(entry, reusable); if (reusable && !this.isShutDown) { this.available.addFirst(entry); } else { entry.close(); } onRelease(entry); Future<E> future = pool.nextPending(); if (future != null) { this.pending.remove(future); } else { future = this.pending.poll(); } if (future != null) { this.condition.signalAll(); } } } finally { this.lock.unlock(); } }
release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)
CPool
org/apache/http/impl/conn/CPool.java
@Contract(threading = ThreadingBehavior.SAFE) class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> { private static final AtomicLong COUNTER = new AtomicLong(); private final Log log = LogFactory.getLog(CPool.class); private final long timeToLive; private final TimeUnit timeUnit; public CPool( final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory, final int defaultMaxPerRoute, final int maxTotal, final long timeToLive, final TimeUnit timeUnit) { super(connFactory, defaultMaxPerRoute, maxTotal); this.timeToLive = timeToLive; this.timeUnit = timeUnit; } @Override protected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) { final String id = Long.toString(COUNTER.getAndIncrement()); return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit); } @Override protected boolean validate(final CPoolEntry entry) { return !entry.getConnection().isStale(); } @Override protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) { super.enumAvailable(callback); } @Override protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) { super.enumLeased(callback); } }
CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry;其createEntry方法创建CPoolEntry,validate则判断connect是不是stale
小结
ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry;
AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType;CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry。
AbstractConnPool的lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法;release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)
以上就是httpclient的CPool定义方法详解的详细内容,更多关于httpclient CPool方法定义的资料请关注脚本之家其它相关文章!
相关文章
从dubbo zookeeper注册地址提取出zookeeper地址的方法
今天小编就为大家分享一篇关于从dubbo zookeeper注册地址提取出zookeeper地址的方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧2018-12-12
最新评论