AsyncHttpClient ChannelPool线程池频道池源码流程解析
序
本文主要研究一下AsyncHttpClient的ChannelPool
ChannelPool
org/asynchttpclient/channel/ChannelPool.java
public interface ChannelPool { /** * Add a channel to the pool * * @param channel an I/O channel * @param partitionKey a key used to retrieve the cached channel * @return true if added. */ boolean offer(Channel channel, Object partitionKey); /** * Remove the channel associated with the uri. * * @param partitionKey the partition used when invoking offer * @return the channel associated with the uri */ Channel poll(Object partitionKey); /** * Remove all channels from the cache. A channel might have been associated * with several uri. * * @param channel a channel * @return the true if the channel has been removed */ boolean removeAll(Channel channel); /** * Return true if a channel can be cached. A implementation can decide based * on some rules to allow caching Calling this method is equivalent of * checking the returned value of {@link ChannelPool#offer(Channel, Object)} * * @return true if a channel can be cached. */ boolean isOpen(); /** * Destroy all channels that has been cached by this instance. */ void destroy(); /** * Flush partitions based on a predicate * * @param predicate the predicate */ void flushPartitions(Predicate<Object> predicate); /** * @return The number of idle channels per host. */ Map<String, Long> getIdleChannelCountPerHost(); }
ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool
NoopChannelPool
org/asynchttpclient/channel/NoopChannelPool.java
public enum NoopChannelPool implements ChannelPool { INSTANCE; @Override public boolean offer(Channel channel, Object partitionKey) { return false; } @Override public Channel poll(Object partitionKey) { return null; } @Override public boolean removeAll(Channel channel) { return false; } @Override public boolean isOpen() { return true; } @Override public void destroy() { } @Override public void flushPartitions(Predicate<Object> predicate) { } @Override public Map<String, Long> getIdleChannelCountPerHost() { return Collections.emptyMap(); } }
NoopChannelPool是个枚举,用枚举实现了单例,其方法默认为空操作
DefaultChannelPool
/** * A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap} */ public final class DefaultChannelPool implements ChannelPool { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class); private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>(); private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Timer nettyTimer; private final int connectionTtl; private final boolean connectionTtlEnabled; private final int maxIdleTime; private final boolean maxIdleTimeEnabled; private final long cleanerPeriod; private final PoolLeaseStrategy poolLeaseStrategy; public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) { this(config.getPooledConnectionIdleTimeout(), config.getConnectionTtl(), hashedWheelTimer, config.getConnectionPoolCleanerPeriod()); } public DefaultChannelPool(int maxIdleTime, int connectionTtl, Timer nettyTimer, int cleanerPeriod) { this(maxIdleTime, connectionTtl, PoolLeaseStrategy.LIFO, nettyTimer, cleanerPeriod); } public DefaultChannelPool(int maxIdleTime, int connectionTtl, PoolLeaseStrategy poolLeaseStrategy, Timer nettyTimer, int cleanerPeriod) { this.maxIdleTime = maxIdleTime; this.connectionTtl = connectionTtl; connectionTtlEnabled = connectionTtl > 0; channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null; this.nettyTimer = nettyTimer; maxIdleTimeEnabled = maxIdleTime > 0; this.poolLeaseStrategy = poolLeaseStrategy; this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE)); if (connectionTtlEnabled || maxIdleTimeEnabled) scheduleNewIdleChannelDetector(new IdleChannelDetector()); } //...... }
DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行
offer
public boolean offer(Channel channel, Object partitionKey) { if (isClosed.get()) return false; long now = unpreciseMillisTime(); if (isTtlExpired(channel, now)) return false; boolean offered = offer0(channel, partitionKey, now); if (connectionTtlEnabled && offered) { registerChannelCreation(channel, partitionKey, now); } return offered; } private boolean isTtlExpired(Channel channel, long now) { if (!connectionTtlEnabled) return false; ChannelCreation creation = channelId2Creation.get(channel.id()); return creation != null && now - creation.creationTime >= connectionTtl; } private boolean offer0(Channel channel, Object partitionKey, long now) { ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey); if (partition == null) { partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>()); } return partition.offerFirst(new IdleChannel(channel, now)); } private void registerChannelCreation(Channel channel, Object partitionKey, long now) { ChannelId id = channel.id(); if (!channelId2Creation.containsKey(id)) { channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey)); } }
offer接口先判断isTtlExpired,如果channel的存活时间超过connectionTtl则返回false,否则执行offer0,往ConcurrentLinkedDeque<IdleChannel>添加,若添加成功且connectionTtlEnabled则执行registerChannelCreation,维护创建时间
poll
/** * {@inheritDoc} */ public Channel poll(Object partitionKey) { IdleChannel idleChannel = null; ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey); if (partition != null) { while (idleChannel == null) { idleChannel = poolLeaseStrategy.lease(partition); if (idleChannel == null) // pool is empty break; else if (!Channels.isChannelActive(idleChannel.channel)) { idleChannel = null; LOGGER.trace("Channel is inactive, probably remotely closed!"); } else if (!idleChannel.takeOwnership()) { idleChannel = null; LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!"); } } } return idleChannel != null ? idleChannel.channel : null; }
poll方法是根据partitionKey找到对应的ConcurrentLinkedDeque<IdleChannel>,然后循环执行poolLeaseStrategy.lease(partition),若idleChannel为null直接break,若isChannelActive为false则重置为null继续循环,若idleChannel.takeOwnership()为false也重置为null继续循环
removeAll
/** * {@inheritDoc} */ public boolean removeAll(Channel channel) { ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channel.id()) : null; return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE)); }
removeAll方法会将指定的channel从channelId2Creation及ConcurrentLinkedDeque<IdleChannel>中移除
isOpen
/** * {@inheritDoc} */ public boolean isOpen() { return !isClosed.get(); }
isOpen则取的isClosed变量
destroy
/** * {@inheritDoc} */ public void destroy() { if (isClosed.getAndSet(true)) return; partitions.clear(); if (connectionTtlEnabled) { channelId2Creation.clear(); } }
destroy会设置isClosed为true,然后清空partitions及channelId2Creation
flushPartitions
public void flushPartitions(Predicate<Object> predicate) { for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) { Object partitionKey = partitionsEntry.getKey(); if (predicate.test(partitionKey)) flushPartition(partitionKey, partitionsEntry.getValue()); } } private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) { if (partition != null) { partitions.remove(partitionKey); for (IdleChannel idleChannel : partition) close(idleChannel.channel); } } private void close(Channel channel) { // FIXME pity to have to do this here Channels.setDiscard(channel); if (connectionTtlEnabled) { channelId2Creation.remove(channel.id()); } Channels.silentlyCloseChannel(channel); }
flushPartitions会遍历partitions,然后执行predicate.test,为true则执行flushPartition,它将从partitions移除指定的partitionKey,然后遍历idleChannels挨个执行close
getIdleChannelCountPerHost
public Map<String, Long> getIdleChannelCountPerHost() { return partitions .values() .stream() .flatMap(ConcurrentLinkedDeque::stream) .map(idle -> idle.getChannel().remoteAddress()) .filter(a -> a.getClass() == InetSocketAddress.class) .map(a -> (InetSocketAddress) a) .map(InetSocketAddress::getHostName) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); }
getIdleChannelCountPerHost则遍历partitions,然后map出remoteAddress获取hostName,然后进行groupBy
PoolLeaseStrategy
public enum PoolLeaseStrategy { LIFO { public <E> E lease(Deque<E> d) { return d.pollFirst(); } }, FIFO { public <E> E lease(Deque<E> d) { return d.pollLast(); } }; abstract <E> E lease(Deque<E> d); }
PoolLeaseStrategy是个枚举,定义了LIFO及FIFO两个枚举,LIFO则是对Deque执行pollFirst,FIFO则是对Deque执行pollLast
IdleChannelDetector
private final class IdleChannelDetector implements TimerTask { private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) { return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime; } private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) { // lazy create List<IdleChannel> idleTimeoutChannels = null; for (IdleChannel idleChannel : partition) { boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now); boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel); boolean isTtlExpired = isTtlExpired(idleChannel.channel, now); if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) { LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired); if (idleTimeoutChannels == null) idleTimeoutChannels = new ArrayList<>(1); idleTimeoutChannels.add(idleChannel); } } return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList(); } private List<IdleChannel> closeChannels(List<IdleChannel> candidates) { // lazy create, only if we hit a non-closeable channel List<IdleChannel> closedChannels = null; for (int i = 0; i < candidates.size(); i++) { // We call takeOwnership here to avoid closing a channel that has just been taken out // of the pool, otherwise we risk closing an active connection. IdleChannel idleChannel = candidates.get(i); if (idleChannel.takeOwnership()) { LOGGER.debug("Closing Idle Channel {}", idleChannel.channel); close(idleChannel.channel); if (closedChannels != null) { closedChannels.add(idleChannel); } } else if (closedChannels == null) { // first non closeable to be skipped, copy all // previously skipped closeable channels closedChannels = new ArrayList<>(candidates.size()); for (int j = 0; j < i; j++) closedChannels.add(candidates.get(j)); } } return closedChannels != null ? closedChannels : candidates; } public void run(Timeout timeout) { if (isClosed.get()) return; if (LOGGER.isDebugEnabled()) for (Object key : partitions.keySet()) { int size = partitions.get(key).size(); if (size > 0) { LOGGER.debug("Entry count for : {} : {}", key, size); } } long start = unpreciseMillisTime(); int closedCount = 0; int totalCount = 0; for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) { // store in intermediate unsynchronized lists to minimize // the impact on the ConcurrentLinkedDeque if (LOGGER.isDebugEnabled()) totalCount += partition.size(); List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start)); if (!closedChannels.isEmpty()) { if (connectionTtlEnabled) { for (IdleChannel closedChannel : closedChannels) channelId2Creation.remove(closedChannel.channel.id()); } partition.removeAll(closedChannels); closedCount += closedChannels.size(); } } if (LOGGER.isDebugEnabled()) { long duration = unpreciseMillisTime() - start; if (closedCount > 0) { LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration); } } scheduleNewIdleChannelDetector(timeout.task()); } }
IdleChannelDetector实现了netty的TimerTask接口,其run方法主要是遍历partitions,通过expiredChannels取出过期的IdleChannel,这里isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都算在内,然后挨个执行takeOwnership及close,再从channelId2Creation及partition中移除,最后再次调度一下IdleChannelDetector
小结
AsyncHttpClient的ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool;DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行。
poll方法会判断是active,不是的话继续循环lease,而IdleChannelDetector则会定期检查,isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都会被close,offer的时候还会判断isTtlExpired,这样子来保证连接的活性。
以上就是AsyncHttpClient ChannelPool的详细内容,更多关于AsyncHttpClient ChannelPool的资料请关注脚本之家其它相关文章!
相关文章
简单聊一聊Java线程池ThreadPoolExecutor
在使用线程池之后,开启线程就变成了在线程池当中找到一个空闲的线程,销毁线程变成了归还线程到线程池的过程,下面这篇文章主要给大家介绍了关于Java线程池ThreadPoolExecutor的相关资料,需要的朋友可以参考下2022-06-06mybatis利用association或collection传递多参数子查询
今天小编就为大家分享一篇关于mybatis利用association或collection传递多参数子查询,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧2019-03-03关于@JsonProperty,@NotNull,@JsonIgnore的具体使用
这篇文章主要介绍了关于@JsonProperty,@NotNull,@JsonIgnore的具体使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-08-08
最新评论