AsyncHttpClient ListenableFuture源码流程解读

 更新时间:2023年12月18日 08:35:35   作者:codecraft  
这篇文章主要为大家介绍了AsyncHttpClient ListenableFuture源码流程解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下AsyncHttpClient的ListenableFuture

ListenableFuture

org/asynchttpclient/ListenableFuture.java

public interface ListenableFuture<V> extends Future<V> {
  /**
   * Terminate and if there is no exception, mark this Future as done and release the internal lock.
   */
  void done();
  /**
   * Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}
   *
   * @param t the exception
   */
  void abort(Throwable t);
  /**
   * Touch the current instance to prevent external service to times out.
   */
  void touch();
  /**
   * Adds a listener and executor to the ListenableFuture.
   * The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed
   * to the executor} for execution when the {@code Future}'s computation is
   * {@linkplain Future#isDone() complete}.
   * <br>
   * Executor can be <code>null</code>, in that case executor will be executed
   * in the thread where completion happens.
   * <br>
   * There is no guaranteed ordering of execution of listeners, they may get
   * called in the order they were added and they may get called out of order,
   * but any listener added through this method is guaranteed to be called once
   * the computation is complete.
   *
   * @param listener the listener to run when the computation is complete.
   * @param exec     the executor to run the listener in.
   * @return this Future
   */
  ListenableFuture<V> addListener(Runnable listener, Executor exec);
  CompletableFuture<V> toCompletableFuture();
  //......
}
ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法

CompletedFailure

org/asynchttpclient/ListenableFuture.java

class CompletedFailure<T> implements ListenableFuture<T> {
    private final ExecutionException e;
    public CompletedFailure(Throwable t) {
      e = new ExecutionException(t);
    }
    public CompletedFailure(String message, Throwable t) {
      e = new ExecutionException(message, t);
    }
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      return true;
    }
    @Override
    public boolean isCancelled() {
      return false;
    }
    @Override
    public boolean isDone() {
      return true;
    }
    @Override
    public T get() throws ExecutionException {
      throw e;
    }
    @Override
    public T get(long timeout, TimeUnit unit) throws ExecutionException {
      throw e;
    }
    @Override
    public void done() {
    }
    @Override
    public void abort(Throwable t) {
    }
    @Override
    public void touch() {
    }
    @Override
    public ListenableFuture<T> addListener(Runnable listener, Executor exec) {
      if (exec != null) {
        exec.execute(listener);
      } else {
        listener.run();
      }
      return this;
    }
    @Override
    public CompletableFuture<T> toCompletableFuture() {
      CompletableFuture<T> future = new CompletableFuture<>();
      future.completeExceptionally(e);
      return future;
    }
  }
CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true

NettyResponseFuture

org/asynchttpclient/netty/NettyResponseFuture.java

public final class NettyResponseFuture<V> implements ListenableFuture<V> {
  private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "redirectCount");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "currentRetry");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isDone");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isCancelled");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inProxyAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "contentProcessed");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "onThrowableCalled");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
  private final long start = unpreciseMillisTime();
  private final ChannelPoolPartitioning connectionPoolPartitioning;
  private final ConnectionSemaphore connectionSemaphore;
  private final ProxyServer proxyServer;
  private final int maxRetry;
  private final CompletableFuture<V> future = new CompletableFuture<>();          
          //......
  @Override
  public V get() throws InterruptedException, ExecutionException {
    return future.get();
  }
  @Override
  public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {
    return future.get(l, tu);
  }          
}
NettyResponseFuture实现了ListenableFuture接口

done

public final void done() {
    if (terminateAndExit())
      return;
    try {
      loadContent();
    } catch (ExecutionException ignored) {
    } catch (RuntimeException t) {
      future.completeExceptionally(t);
    } catch (Throwable t) {
      future.completeExceptionally(t);
      throw t;
    }
  }
  private boolean terminateAndExit() {
    releasePartitionKeyLock();
    cancelTimeouts();
    this.channel = null;
    this.reuseChannel = false;
    return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
  }  
private void loadContent() throws ExecutionException {
    if (future.isDone()) {
      try {
        future.get();
      } catch (InterruptedException e) {
        throw new RuntimeException("unreachable", e);
      }
    }
    // No more retry
    CURRENT_RETRY_UPDATER.set(this, maxRetry);
    if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {
      try {
        future.complete(asyncHandler.onCompleted());
      } catch (Throwable ex) {
        if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
          try {
            try {
              asyncHandler.onThrowable(ex);
            } catch (Throwable t) {
              LOGGER.debug("asyncHandler.onThrowable", t);
            }
          } finally {
            cancelTimeouts();
          }
        }
        future.completeExceptionally(ex);
      }
    }
    future.getNow(null);
  }
done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调

abort

public final void abort(final Throwable t) {
    if (terminateAndExit())
      return;
    future.completeExceptionally(t);
    if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {
      try {
        asyncHandler.onThrowable(t);
      } catch (Throwable te) {
        LOGGER.debug("asyncHandler.onThrowable", te);
      }
    }
  }
abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调

touch

public void touch() {
    touch = unpreciseMillisTime();
  }
touch方法用当前时间戳更新touch属性

addListener

public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
    if (exec == null) {
      exec = Runnable::run;
    }
    future.whenCompleteAsync((r, v) -> listener.run(), exec);
    return this;
  }
addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)

toCompletableFuture

public CompletableFuture<V> toCompletableFuture() {
    return future;
  }
toCompletableFuture方法直接返回future

newNettyResponseFuture

org/asynchttpclient/netty/request/NettyRequestSender.java

private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,
                                                            AsyncHandler<T> asyncHandler,
                                                            NettyRequest nettyRequest,
                                                            ProxyServer proxyServer) {
    NettyResponseFuture<T> future = new NettyResponseFuture<>(
            request,
            asyncHandler,
            nettyRequest,
            config.getMaxRequestRetry(),
            request.getChannelPoolPartitioning(),
            connectionSemaphore,
            proxyServer);
    String expectHeader = request.getHeaders().get(EXPECT);
    if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))
      future.setDontWriteBodyBecauseExpectContinue(true);
    return future;
  }
  private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
                                                                     AsyncHandler<T> asyncHandler,
                                                                     NettyResponseFuture<T> future,
                                                                     ProxyServer proxyServer,
                                                                     boolean performConnectRequest) {
    NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
            performConnectRequest);
    Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
    return Channels.isChannelActive(channel)
            ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
            : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
  }
NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求

小结

AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。

以上就是聊聊AsyncHttpClient的ListenableFuture的详细内容,更多关于AsyncHttpClient的ListenableFuture的资料请关注脚本之家其它相关文章!

以上就是AsyncHttpClient ListenableFuture源码流程解读的详细内容,更多关于AsyncHttpClient ListenableFuture的资料请关注脚本之家其它相关文章!

相关文章

  • elasticsearch kibana简单查询讲解

    elasticsearch kibana简单查询讲解

    今天小编就为大家分享一篇关于elasticsearch kibana简单查询讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-02-02
  • redis在java中的使用(实例讲解)

    redis在java中的使用(实例讲解)

    下面小编就为大家带来一篇redis 在java中的使用(实例讲解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • java开发中的误区和细节整理

    java开发中的误区和细节整理

    这篇文章给大家整理了关于JAVA开发中的细节以及经常进入的误区整理,希望我们整理的内容能够给大家提供到帮助。
    2018-04-04
  • @Autowired注解以及失效的几个原因图文详解

    @Autowired注解以及失效的几个原因图文详解

    在微服务项目中,会遇到@Autowired注解失效的情况,下面这篇文章主要给大家介绍了关于@Autowired注解以及失效的几个原因的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-03-03
  • Java LocalCache 本地缓存的实现实例

    Java LocalCache 本地缓存的实现实例

    本篇文章主要介绍了Java LocalCache 本地缓存的实现实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2017-05-05
  • Java Web之限制用户多处登录实例代码

    Java Web之限制用户多处登录实例代码

    本篇文章主要介绍了Java Web之限制用户多处登录实例代码,可以限制单个用户在多个终端登录。非常具有实用价值,需要的朋友可以参考下。
    2017-03-03
  • Java实现视频自定义裁剪功能

    Java实现视频自定义裁剪功能

    这篇文章主要介绍了如何通过java实现视频裁剪,可以将视频按照自定义尺寸进行裁剪,文中的示例代码简洁易懂,感兴趣的可以了解一下
    2022-01-01
  • Seata AT模式前后镜像是如何生成详解

    Seata AT模式前后镜像是如何生成详解

    这篇文章主要为大家介绍了Seata AT模式前后镜像是如何生成的方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • Java泛型变量如何添加约束

    Java泛型变量如何添加约束

    这篇文章主要介绍了Java泛型变量如何添加约束,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • Spring Boot入门(web+freemarker)

    Spring Boot入门(web+freemarker)

    这篇文章主要介绍了Spring Boot入门(web+freemarker)的相关资料,需要的朋友可以参考下
    2017-06-06

最新评论