AsyncHttpClient的TimeoutTimerTask连接池异步超时

 更新时间:2023年12月14日 10:20:15   作者:codecraft  
这篇文章主要为大家介绍了AsyncHttpClient的TimeoutTimerTask连接池异步超时源码流程解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下AsyncHttpClient的TimeoutTimerTask

TimerTask

io/netty/util/TimerTask.java

/**
 * A task which is executed after the delay specified with
 * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
 */
public interface TimerTask {
    /**
     * Executed after the delay specified with
     * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
     *
     * @param timeout a handle which is associated with this task
     */
    void run(Timeout timeout) throws Exception;
}
netty的TimerTask接口定义了run方法,其入参为Timeout

Timeout

io/netty/util/Timeout.java

public interface Timeout {
    /**
     * Returns the {@link Timer} that created this handle.
     */
    Timer timer();
    /**
     * Returns the {@link TimerTask} which is associated with this handle.
     */
    TimerTask task();
    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been expired.
     */
    boolean isExpired();
    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been cancelled.
     */
    boolean isCancelled();
    /**
     * Attempts to cancel the {@link TimerTask} associated with this handle.
     * If the task has been executed or cancelled already, it will return with
     * no side effect.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();
}
Timeout接口定义了timer()、task()、isExpired()、isCancelled()、cancel()方法

TimeoutTimerTask

org/asynchttpclient/netty/timeout/TimeoutTimerTask.java

public abstract class TimeoutTimerTask implements TimerTask {
  private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);
  protected final AtomicBoolean done = new AtomicBoolean();
  protected final NettyRequestSender requestSender;
  final TimeoutsHolder timeoutsHolder;
  volatile NettyResponseFuture<?> nettyResponseFuture;
  TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) {
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.timeoutsHolder = timeoutsHolder;
  }
  void expire(String message, long time) {
    LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time);
    requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message));
  }
  /**
   * When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the
   * channel, and heavy objects such as SslEngines
   */
  public void clean() {
    if (done.compareAndSet(false, true)) {
      nettyResponseFuture = null;
    }
  }
  void appendRemoteAddress(StringBuilder sb) {
    InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress();
    sb.append(remoteAddress.getHostName());
    if (!remoteAddress.isUnresolved()) {
      sb.append('/').append(remoteAddress.getAddress().getHostAddress());
    }
    sb.append(':').append(remoteAddress.getPort());
  }
}
TimeoutTimerTask声明实现TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture

ReadTimeoutTimerTask

org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java

public class ReadTimeoutTimerTask extends TimeoutTimerTask {
  private final long readTimeout;
  ReadTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                       NettyRequestSender requestSender,
                       TimeoutsHolder timeoutsHolder,
                       int readTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.readTimeout = readTimeout;
  }
  public void run(Timeout timeout) {
    if (done.getAndSet(true) || requestSender.isClosed())
      return;
    if (nettyResponseFuture.isDone()) {
      timeoutsHolder.cancel();
      return;
    }
    long now = unpreciseMillisTime();
    long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch();
    long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now;
    if (durationBeforeCurrentReadTimeout <= 0L) {
      // idleConnectTimeout reached
      StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
      appendRemoteAddress(sb);
      String message = sb.append(" after ").append(readTimeout).append(" ms").toString();
      long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
      expire(message, durationSinceLastTouch);
      // cancel request timeout sibling
      timeoutsHolder.cancel();
    } else {
      done.set(false);
      timeoutsHolder.startReadTimeout(this);
    }
  }
}
ReadTimeoutTimerTask继承了TimeoutTimerTask,其run方法会根据readTimeout及nettyResponseFuture.getLastTouch()计算currentReadTimeoutInstant,然后判断是否已经超时,是则执行expire及timeoutsHolder.cancel(),否则执行timeoutsHolder.startReadTimeout(this)

startReadTimeout

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

void startReadTimeout(ReadTimeoutTimerTask task) {
    if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) {
      // only schedule a new readTimeout if the requestTimeout doesn't happen first
      if (task == null) {
        // first call triggered from outside (else is read timeout is re-scheduling itself)
        task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue);
      }
      this.readTimeout = newTimeout(task, readTimeoutValue);
    } else if (task != null) {
      // read timeout couldn't re-scheduling itself, clean up
      task.clean();
    }
  }
startReadTimeout会判断readTimeoutValue+当前时间是否小于requestTimeoutMillisTime,是则通过newTimeout调度,否则执行task.clean()

RequestTimeoutTimerTask

org/asynchttpclient/netty/timeout/RequestTimeoutTimerTask.java

public class RequestTimeoutTimerTask extends TimeoutTimerTask {
  private final long requestTimeout;
  RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                                 NettyRequestSender requestSender,
                                 TimeoutsHolder timeoutsHolder,
                                 int requestTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.requestTimeout = requestTimeout;
  }
  public void run(Timeout timeout) {
    if (done.getAndSet(true) || requestSender.isClosed())
      return;
    // in any case, cancel possible readTimeout sibling
    timeoutsHolder.cancel();
    if (nettyResponseFuture.isDone())
      return;
    StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to ");
    appendRemoteAddress(sb);
    String message = sb.append(" after ").append(requestTimeout).append(" ms").toString();
    long age = unpreciseMillisTime() - nettyResponseFuture.getStart();
    expire(message, age);
  }
}
RequestTimeoutTimerTask继承了TimeoutTimerTask,其run方法在done为true或者requestSender为closed则直接返回,对于nettyResponseFuture.isDone()也直接返回,其余的执行expire方法

TimeoutsHolder

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

public class TimeoutsHolder {
  private final Timeout requestTimeout;
  private final AtomicBoolean cancelled = new AtomicBoolean();
  private final Timer nettyTimer;
  private final NettyRequestSender requestSender;
  private final long requestTimeoutMillisTime;
  private final int readTimeoutValue;
  private volatile Timeout readTimeout;
  private volatile NettyResponseFuture<?> nettyResponseFuture;
  private volatile InetSocketAddress remoteAddress;
  public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) {
    this.nettyTimer = nettyTimer;
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.remoteAddress = originalRemoteAddress;
    final Request targetRequest = nettyResponseFuture.getTargetRequest();
    final int readTimeoutInMs = targetRequest.getReadTimeout();
    this.readTimeoutValue = readTimeoutInMs == 0 ? config.getReadTimeout() : readTimeoutInMs;
    int requestTimeoutInMs = targetRequest.getRequestTimeout();
    if (requestTimeoutInMs == 0) {
      requestTimeoutInMs = config.getRequestTimeout();
    }
    if (requestTimeoutInMs != -1) {
      requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs;
      requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs);
    } else {
      requestTimeoutMillisTime = -1L;
      requestTimeout = null;
    }
  }
  //......
}
TimeoutsHolder的构造器对于requestTimeoutInMs不为-1的会创建RequestTimeoutTimerTask,然后通过newTimeout进行调度

scheduleRequestTimeout

org/asynchttpclient/netty/request/NettyRequestSender.java

private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T> future,
                                                             AsyncHandler<T> asyncHandler,
                                                             Channel channel) {
    try {
      asyncHandler.onConnectionPooled(channel);
    } catch (Exception e) {
      LOGGER.error("onConnectionPooled crashed", e);
      abort(channel, future, e);
      return future;
    }
    SocketAddress channelRemoteAddress = channel.remoteAddress();
    if (channelRemoteAddress != null) {
      // otherwise, bad luck, the channel was closed, see bellow
      scheduleRequestTimeout(future, (InetSocketAddress) channelRemoteAddress);
    }
    future.setChannelState(ChannelState.POOLED);
    future.attachChannel(channel, false);
    if (LOGGER.isDebugEnabled()) {
      HttpRequest httpRequest = future.getNettyRequest().getHttpRequest();
      LOGGER.debug("Using open Channel {} for {} '{}'", channel, httpRequest.method(), httpRequest.uri());
    }
    // channelInactive might be called between isChannelValid and writeRequest
    // so if we don't store the Future now, channelInactive won't perform
    // handleUnexpectedClosedChannel
    Channels.setAttribute(channel, future);
    if (Channels.isChannelActive(channel)) {
      writeRequest(future, channel);
    } else {
      // bad luck, the channel was closed in-between
      // there's a very good chance onClose was already notified but the
      // future wasn't already registered
      handleUnexpectedClosedChannel(channel, future);
    }
    return future;
  }
  private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture,
                                      InetSocketAddress originalRemoteAddress) {
    nettyResponseFuture.touch();
    TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config,
            originalRemoteAddress);
    nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
  }
  public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
    NettyRequest nettyRequest = future.getNettyRequest();
    HttpRequest httpRequest = nettyRequest.getHttpRequest();
    AsyncHandler<T> asyncHandler = future.getAsyncHandler();
    // if the channel is dead because it was pooled and the remote server decided to
    // close it,
    // we just let it go and the channelInactive do its work
    if (!Channels.isChannelActive(channel))
      return;
    try {
      if (asyncHandler instanceof TransferCompletionHandler) {
        configureTransferAdapter(asyncHandler, httpRequest);
      }
      boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue()
              && httpRequest.method() != HttpMethod.CONNECT && nettyRequest.getBody() != null;
      if (!future.isHeadersAlreadyWrittenOnContinue()) {
        try {
          asyncHandler.onRequestSend(nettyRequest);
        } catch (Exception e) {
          LOGGER.error("onRequestSend crashed", e);
          abort(channel, future, e);
          return;
        }
        // if the request has a body, we want to track progress
        if (writeBody) {
          // FIXME does this really work??? the promise is for the request without body!!!
          ChannelProgressivePromise promise = channel.newProgressivePromise();
          ChannelFuture f = channel.write(httpRequest, promise);
          f.addListener(new WriteProgressListener(future, true, 0L));
        } else {
          // we can just track write completion
          ChannelPromise promise = channel.newPromise();
          ChannelFuture f = channel.writeAndFlush(httpRequest, promise);
          f.addListener(new WriteCompleteListener(future));
        }
      }
      if (writeBody)
        nettyRequest.getBody().write(channel, future);
      // don't bother scheduling read timeout if channel became invalid
      if (Channels.isChannelActive(channel)) {
        scheduleReadTimeout(future);
      }
    } catch (Exception e) {
      LOGGER.error("Can't write request", e);
      abort(channel, future, e);
    }
  }
NettyRequestSender的sendRequestWithOpenChannel方法在channelRemoteAddress不为null时会执行scheduleRequestTimeout,创建TimeoutsHolder调度RequestTimeoutTimerTask;其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,若channel还是active的则通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask

小结

AsyncHttpClient的TimeoutTimerTask声明实现了netty的TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture;它有一个抽象子类为TimeoutTimerTask,RequestTimeoutTimerTask及ReadTimeoutTimerTask继承了TimeoutTimerTask;AsyncHttpClient用TimeoutsHolder来封装了这些timeout timer,NettyRequestSender的sendRequestWithOpenChannel方法会触发调度RequestTimeoutTimerTask,而其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask。

可以看到requestTimeoutMillisTime是总的请求时间,它包含了写入数据之后的readTimeoutValue

以上就是AsyncHttpClient的TimeoutTimerTask连接池异步超时的详细内容,更多关于AsyncHttpClient TimeoutTimerTask的资料请关注脚本之家其它相关文章!

相关文章

  • Java异步线程中的CompletableFuture与@Async详解

    Java异步线程中的CompletableFuture与@Async详解

    这篇文章主要介绍了Java异步线程中的CompletableFuture与@Async详解,CompletableFuture是java中提供的一个异步执行类,@Async是Spring提供的异步执行方法,当调用方法单独开启一个线程进行调用,需要的朋友可以参考下
    2024-01-01
  • SpringBoot整合阿里云视频点播的过程详解

    SpringBoot整合阿里云视频点播的过程详解

    视频点播(ApsaraVideo for VoD)是集音视频采集、编辑、上传、自动化转码处理、媒体资源管理、分发加速于一体的一站式音视频点播解决方案。这篇文章主要介绍了SpringBoot整合阿里云视频点播的详细过程,需要的朋友可以参考下
    2021-12-12
  • Java RabbitMQ 中的消息长期不消费会过期吗

    Java RabbitMQ 中的消息长期不消费会过期吗

    RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。 RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除
    2021-09-09
  • spring cloud consul使用ip注册服务的方法示例

    spring cloud consul使用ip注册服务的方法示例

    这篇文章主要介绍了spring cloud consul使用ip注册服务的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • Java中DecimalFormat用法及符号含义

    Java中DecimalFormat用法及符号含义

    DecimalFormat是NumberFormat的一个具体子类,用于格式化十进制数字。这篇文章介绍了DecimalFormat的用法及符号含义,需要的朋友可以收藏下,方便下次浏览观看
    2021-12-12
  • 详解Java的Enum的使用与分析

    详解Java的Enum的使用与分析

    这篇文章主要介绍了详解Java的Enum的使用与分析的相关资料,需要的朋友可以参考下
    2017-02-02
  • SpringMVC实现Controller的三种方式总结

    SpringMVC实现Controller的三种方式总结

    这篇文章主要介绍了SpringMVC实现Controller的三种方式总结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • Spring IOC中的Bean对象用法

    Spring IOC中的Bean对象用法

    这篇文章主要介绍了Spring IOC中的Bean对象用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java实现视频自定义裁剪功能

    Java实现视频自定义裁剪功能

    这篇文章主要介绍了如何通过java实现视频裁剪,可以将视频按照自定义尺寸进行裁剪,文中的示例代码简洁易懂,感兴趣的可以了解一下
    2022-01-01
  • java文件上传至ftp服务器的方法

    java文件上传至ftp服务器的方法

    这篇文章主要为大家详细介绍了java文件上传至ftp服务器的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01

最新评论