AsyncHttpClient的ConnectionSemaphore方法源码流程解读
序
本文主要研究一下AsyncHttpClient的ConnectionSemaphore
ConnectionSemaphore
org/asynchttpclient/netty/channel/ConnectionSemaphore.java
/** * Max connections and max-per-host connections limiter. * * @author Stepan Koltsov */ public class ConnectionSemaphore { private final NonBlockingSemaphoreLike freeChannels; private final int maxConnectionsPerHost; private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>(); private final IOException tooManyConnections; private final IOException tooManyConnectionsPerHost; private ConnectionSemaphore(AsyncHttpClientConfig config) { tooManyConnections = unknownStackTrace(new TooManyConnectionsException(config.getMaxConnections()), ConnectionSemaphore.class, "acquireChannelLock"); tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()), ConnectionSemaphore.class, "acquireChannelLock"); int maxTotalConnections = config.getMaxConnections(); maxConnectionsPerHost = config.getMaxConnectionsPerHost(); freeChannels = maxTotalConnections > 0 ? new NonBlockingSemaphore(config.getMaxConnections()) : NonBlockingSemaphoreInfinite.INSTANCE; } public static ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) { return config.getMaxConnections() > 0 || config.getMaxConnectionsPerHost() > 0 ? new ConnectionSemaphore(config) : null; } private boolean tryAcquireGlobal() { return freeChannels.tryAcquire(); } private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) { return maxConnectionsPerHost > 0 ? freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(maxConnectionsPerHost)) : NonBlockingSemaphoreInfinite.INSTANCE; } private boolean tryAcquirePerHost(Object partitionKey) { return getFreeConnectionsForHost(partitionKey).tryAcquire(); } public void acquireChannelLock(Object partitionKey) throws IOException { if (!tryAcquireGlobal()) throw tooManyConnections; if (!tryAcquirePerHost(partitionKey)) { freeChannels.release(); throw tooManyConnectionsPerHost; } } public void releaseChannelLock(Object partitionKey) { freeChannels.release(); getFreeConnectionsForHost(partitionKey).release(); } }
ConnectionSemaphore主要用于控制连接的maxConnections及maxConnectionsPerHost;它定义了freeChannels表示可用连接的信号量,定义了freeChannelsPerHost维护每个host的可用连接新用量,类型是NonBlockingSemaphoreLike;它提供了tryAcquireGlobal用于获取全局的空闲连接,tryAcquirePerHost用于获取指定host的空闲连接;acquireChannelLock先获取全局空闲连接,获取不到抛出TooManyConnectionsException,再获取指定host的空闲连接,获取不到则释放全局空闲连接,抛出TooManyConnectionsPerHostException;releaseChannelLock则先释放全局空闲连接,再释放指定host的空闲连接
NonBlockingSemaphoreLike
org/asynchttpclient/netty/channel/NonBlockingSemaphoreLike.java
/** * Non-blocking semaphore API. * * @author Stepan Koltsov */ interface NonBlockingSemaphoreLike { void release(); boolean tryAcquire(); }
NonBlockingSemaphoreLike接口定义了release、tryAcquire方法,它有两个实现类,分别是NonBlockingSemaphore、NonBlockingSemaphoreInfinite
NonBlockingSemaphore
org/asynchttpclient/netty/channel/NonBlockingSemaphore.java
class NonBlockingSemaphore implements NonBlockingSemaphoreLike { private final AtomicInteger permits; NonBlockingSemaphore(int permits) { this.permits = new AtomicInteger(permits); } @Override public void release() { permits.incrementAndGet(); } @Override public boolean tryAcquire() { for (; ; ) { int count = permits.get(); if (count <= 0) { return false; } if (permits.compareAndSet(count, count - 1)) { return true; } } } @Override public String toString() { // mimic toString of Semaphore class return super.toString() + "[Permits = " + permits + "]"; } }
NonBlockingSemaphore内部使用AtomicInteger来进行控制,permits表示可用的数量,release方法则递增permits,tryAcquire则循环执行先判断permits是否大于0,否则返回false,若permits.compareAndSet(count, count - 1)成功则返回true,否则继续循环执行直到返回false或者true
NonBlockingSemaphoreInfinite
org/asynchttpclient/netty/channel/NonBlockingSemaphoreInfinite.java
enum NonBlockingSemaphoreInfinite implements NonBlockingSemaphoreLike { INSTANCE; @Override public void release() { } @Override public boolean tryAcquire() { return true; } @Override public String toString() { return NonBlockingSemaphore.class.getName(); } }
NonBlockingSemaphoreInfinite表示无限的信号量,release为空操作,tryAcquire始终返回true
NettyResponseFuture
org/asynchttpclient/netty/NettyResponseFuture.java
/** * A {@link Future} that can be used to track when an asynchronous HTTP request * has been fully processed. * * @param <V> the result type */ public final class NettyResponseFuture<V> implements ListenableFuture<V> { //...... public void acquirePartitionLockLazily() throws IOException { if (connectionSemaphore == null || partitionKeyLock != null) { return; } Object partitionKey = getPartitionKey(); connectionSemaphore.acquireChannelLock(partitionKey); Object prevKey = PARTITION_KEY_LOCK_FIELD.getAndSet(this, partitionKey); if (prevKey != null) { // self-check connectionSemaphore.releaseChannelLock(prevKey); releasePartitionKeyLock(); throw new IllegalStateException("Trying to acquire partition lock concurrently. Please report."); } if (isDone()) { // may be cancelled while we acquired a lock releasePartitionKeyLock(); } } public boolean cancel(boolean force) { releasePartitionKeyLock(); cancelTimeouts(); if (IS_CANCELLED_FIELD.getAndSet(this, 1) != 0) return false; // cancel could happen before channel was attached if (channel != null) { Channels.setDiscard(channel); Channels.silentlyCloseChannel(channel); } if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) { try { asyncHandler.onThrowable(new CancellationException()); } catch (Throwable t) { LOGGER.warn("cancel", t); } } future.cancel(false); return true; } private boolean terminateAndExit() { releasePartitionKeyLock(); cancelTimeouts(); this.channel = null; this.reuseChannel = false; return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0; } private void releasePartitionKeyLock() { if (connectionSemaphore == null) { return; } Object partitionKey = takePartitionKeyLock(); if (partitionKey != null) { connectionSemaphore.releaseChannelLock(partitionKey); } } }
NettyResponseFuture提供了acquirePartitionLockLazily方法,它会通过connectionSemaphore.acquireChannelLock(partitionKey)来获取连接信号量;cancel和terminateAndExit都会执行releasePartitionKeyLock,它会调用connectionSemaphore.releaseChannelLock(partitionKey)
NettyRequestSender
org/asynchttpclient/netty/request/NettyRequestSender.java
public final class NettyRequestSender { private static final Logger LOGGER = LoggerFactory.getLogger(NettyRequestSender.class); private final AsyncHttpClientConfig config; private final ChannelManager channelManager; private final ConnectionSemaphore connectionSemaphore; private final Timer nettyTimer; private final AsyncHttpClientState clientState; private final NettyRequestFactory requestFactory; public NettyRequestSender(AsyncHttpClientConfig config, ChannelManager channelManager, Timer nettyTimer, AsyncHttpClientState clientState) { this.config = config; this.channelManager = channelManager; this.connectionSemaphore = ConnectionSemaphore.newConnectionSemaphore(config); this.nettyTimer = nettyTimer; this.clientState = clientState; requestFactory = new NettyRequestFactory(config); } public <T> ListenableFuture<T> sendRequest(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future) { if (isClosed()) { throw new IllegalStateException("Closed"); } validateWebSocketRequest(request, asyncHandler); ProxyServer proxyServer = getProxyServer(config, request); // WebSockets use connect tunneling to work with proxies if (proxyServer != null // && (request.getUri().isSecured() || request.getUri().isWebSocket()) // && !isConnectDone(request, future) // && proxyServer.getProxyType().isHttp()) { // Proxy with HTTPS or WebSocket: CONNECT for sure if (future != null && future.isConnectAllowed()) { // Perform CONNECT return sendRequestWithCertainForceConnect(request, asyncHandler, future, proxyServer, true); } else { // CONNECT will depend if we can pool or connection or if we have to open a new // one return sendRequestThroughSslProxy(request, asyncHandler, future, proxyServer); } } else { // no CONNECT for sure return sendRequestWithCertainForceConnect(request, asyncHandler, future, proxyServer, false); } } //...... }
NettyRequestSender的构造器会根据配置创建ConnectionSemaphore,其sendRequest方法内部调用的是sendRequestWithCertainForceConnect、sendRequestThroughSslProxy
sendRequestWithCertainForceConnect
/** * We know for sure if we have to force to connect or not, so we can build the * HttpRequest right away This reduces the probability of having a pooled * channel closed by the server by the time we build the request */ private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request, AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, ProxyServer proxyServer, boolean performConnectRequest) { NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, performConnectRequest); Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler); return Channels.isChannelActive(channel) ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel) : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler); }
sendRequestWithCertainForceConnect方法先通过getOpenChannel获取channel,然后执行sendRequestWithOpenChannel或者sendRequestWithNewChannel
sendRequestThroughSslProxy
private <T> ListenableFuture<T> sendRequestThroughSslProxy(Request request, AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, ProxyServer proxyServer) { NettyResponseFuture<T> newFuture = null; for (int i = 0; i < 3; i++) { Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler); if (channel == null) { // pool is empty break; } if (newFuture == null) { newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, false); } if (Channels.isChannelActive(channel)) { // if the channel is still active, we can use it, // otherwise, channel was closed by the time we computed the request, try again return sendRequestWithOpenChannel(newFuture, asyncHandler, channel); } } // couldn't poll an active channel newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer, true); return sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler); }
sendRequestThroughSslProxy也是先通过getOpenChannel获取channel,然后执行sendRequestWithOpenChannel或者sendRequestWithNewChannel
sendRequestWithNewChannel
private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request, ProxyServer proxy, NettyResponseFuture<T> future, AsyncHandler<T> asyncHandler) { // some headers are only set when performing the first request HttpHeaders headers = future.getNettyRequest().getHttpRequest().headers(); Realm realm = future.getRealm(); Realm proxyRealm = future.getProxyRealm(); requestFactory.addAuthorizationHeader(headers, perConnectionAuthorizationHeader(request, proxy, realm)); requestFactory.setProxyAuthorizationHeader(headers, perConnectionProxyAuthorizationHeader(request, proxyRealm)); future.setInAuth(realm != null && realm.isUsePreemptiveAuth() && realm.getScheme() != AuthScheme.NTLM); future.setInProxyAuth( proxyRealm != null && proxyRealm.isUsePreemptiveAuth() && proxyRealm.getScheme() != AuthScheme.NTLM); try { if (!channelManager.isOpen()) { throw PoolAlreadyClosedException.INSTANCE; } // Do not throw an exception when we need an extra connection for a // redirect. future.acquirePartitionLockLazily(); } catch (Throwable t) { abort(null, future, getCause(t)); // exit and don't try to resolve address return future; } resolveAddresses(request, proxy, future, asyncHandler) .addListener(new SimpleFutureListener<List<InetSocketAddress>>() { @Override protected void onSuccess(List<InetSocketAddress> addresses) { NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, connectionSemaphore); NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, clientState); if (!future.isDone()) { // Do not throw an exception when we need an extra connection for a redirect // FIXME why? This violate the max connection per host handling, right? channelManager.getBootstrap(request.getUri(), request.getNameResolver(), proxy) .addListener((Future<Bootstrap> whenBootstrap) -> { if (whenBootstrap.isSuccess()) { connector.connect(whenBootstrap.get(), connectListener); } else { abort(null, future, whenBootstrap.cause()); } }); } } @Override protected void onFailure(Throwable cause) { abort(null, future, getCause(cause)); } }); return future; }
sendRequestWithNewChannel方法会执行future.acquirePartitionLockLazily()来判断连接是否超出限制,而sendRequestWithOpenChannel方法则没有这一层判断
小结
- AsyncHttpClient通过ConnectionSemaphore来控制连接的maxConnections及maxConnectionsPerHost
- NonBlockingSemaphore内部使用AtomicInteger来进行控制,permits表示可用的数量,release方法则递增permits,tryAcquire则循环执行先判断permits是否大于0,否则返回false,若permits.compareAndSet(count, count - 1)成功则返回true,否则继续循环执行直到返回false或者true
- NettyResponseFuture提供了acquirePartitionLockLazily方法,它会通过connectionSemaphore.acquireChannelLock(partitionKey)来获取连接信号量;cancel和terminateAndExit都会执行releasePartitionKeyLock,它会调用connectionSemaphore.releaseChannelLock(partitionKey)
- NettyRequestSender的构造器会根据配置创建ConnectionSemaphore,其sendRequest方法内部调用的是sendRequestWithCertainForceConnect、sendRequestThroughSslProxy,它们都是先通过getOpenChannel获取channel,然后根据channel是否active来执行sendRequestWithOpenChannel或者sendRequestWithNewChannel;sendRequestWithNewChannel方法会执行future.acquirePartitionLockLazily()来判断连接是否超出限制,而sendRequestWithOpenChannel方法则没有这一层判断
综上,AsyncHttpClient有定义了ChannelPool,不过其连接数的控制不是在ChannelPool里头,而是通过ConnectionSemaphore来控制连接的maxConnections及maxConnectionsPerHost来执行,它主要是在每次sendRequestWithNewChannel的时候进行控制,先执行future.acquirePartitionLockLazily()获取允许,再进行connect建立连接。
以上就是聊聊AsyncHttpClient的ConnectionSemaphore的详细内容,更多关于AsyncHttpClient的ConnectionSemaphore的资料请关注脚本之家其它相关文章!
相关文章
Spring Data JPA 整合QueryDSL的使用案例
QueryDSL 是一个用于构建类型安全的 SQL 查询的 Java 库,它的主要目标是简化在 Java 中构建和执行 SQL 查询的过程,同时提供类型安全性和更好的编码体验,对Spring Data JPA 整合QueryDSL使用案例感兴趣的朋友跟随小编一起看看吧2023-08-08
最新评论