Java多线程之scheduledThreadPool的方法解析

 更新时间:2023年12月28日 09:45:21   作者:竹下星空  
这篇文章主要介绍了Java多线程之scheduledThreadPool的方法解析,queue是DelayedWorkQueue,但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量,需要的朋友可以参考下

scheduledThreadPool

我们对java中定时任务实现可能会有以下疑问:

怎样做到每个任务延迟指定时间执行?

内部使用了什么数据结构保存延迟任务?

延迟任务放入scheduledThreadPool时机并不固定,怎么保证按延迟时间顺序执行?

构造器

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  new DelayedWorkQueue());
}

corePoolSize就是我们传过了的参数,maximumPoolSize是Integer.MAX_VALUE,所以最大线程是无穷大,非核心线程成活时间是0,所以非核心线程执行完firstTask之后如果poll任务没拿到任务则会直接销毁。queue是DelayedWorkQueue。但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量

schedule(Runnable command,long delay, TimeUnit unit)方法

public ScheduledFuture<?>schedule(Runnable command,
long delay,
   TimeUnit unit){
if(command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t =decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
 
  • 通过decorateTask方法获取到RunnableScheduledFuture(实际上是ScheduledFutureTask对象),并把delay时间变成了时间戳
  • 执行delayedExecute方法

delayedExecute方法

private voiddelayedExecute(RunnableScheduledFuture<?> task){
if(isShutdown())
reject(task);
else{
super.getQueue().add(task);
if(isShutdown()&&
!canRunInCurrentRunState(task.isPeriodic())&&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
 
  1. 使用queue.add方法把task放入queue
  2. 执行ensurePrestart方法

offer方法

public boolean offer(Runnable x){
if(x == null)
throw new NullPointerException();
    RunnableScheduledFuture<?> e =(RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
int i = size;
if(i >= queue.length)
grow();
size = i +1;
if(i ==0){
    queue[0]= e;
setIndex(e,0);
}else{
siftUp(i, e);
}
if(queue[0]== e){
    leader = null;
    available.signal();
}
} finally {
lock.unlock();
}
return true;
}
 
  1. DelayedWorkQueue底层使用的是RunnableScheduledFuture的数组,初始化容量是16,之后扩容是以1.5倍进行。
  2. 在offer元素整个过程中使用ReentrantLock进行加锁,所以DelayedWorkQueue是一个线程安全的队列。然后使用了condition来实现阻塞的功能,当poll没有元素时会使用await进行等待,当offer的是数组的第一个元素时会signal,这个signal的设计是排序的点睛之笔,设计的非常巧妙,这块需要offer和take方法一起来看,在take方法时会拿第一个元素来判断delay的时间,如果时间没到会使用await休眠delay时间,但此时如果有delay时间更短的任务放入queue中,此时需要take的任务就不是之前的那个任务了,就要重新执行逻辑获取这个最新delay的任务,这样才能做到任务的正确执行。
  3. 在offer元素时会使用siftUp方法来保证数组中元素是按delay时间从小到大排列,但要注意的是数组前半部分肯定都是排了delay最小的任务,但后半部分不一定是有序的

ensurePrestart()方法

voidensurePrestart(){
int wc =workerCountOf(ctl.get());
if(wc < corePoolSize)
addWorker(null, true);
elseif(wc ==0)
addWorker(null, false);
}
 

这个比较简单,addWorker方法之前我们也分析过了,需要注意的是这里的firstTask默认是空的,所以工作线程会直接从queue中拿任务。这有个比较奇怪的else if,感觉应该永远不用执行,因为wc==0肯定已经被if条件拦截了,也就是只能起核心线程数。最大线程数永远不会起作用

poll方法

public RunnableScheduledFuture<?>poll(long timeout, TimeUnit unit)
    throws InterruptedException {
long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
加锁
    lock.lockInterruptibly();
    try {
自旋
for(;;){
拿到queue中的第一个元素,如果是空则awaitNanos时间,等待时间过后如果queue中还是没有元素则返回null。
    RunnableScheduledFuture<?> first = queue[0];
if(first == null){
if(nanos <=0)
return null;
else
    nanos = available.awaitNanos(nanos);
}else{
拿到第一个任务的delay时间,如果到了delay时间则返回finishPoll方法的结果
long delay = first.getDelay(NANOSECONDS);
if(delay <=0)
returnfinishPoll(first);
如果传入的nanos小于等于0则返回null
if(nanos <=0)
return null;
first = null;// don't retain ref while waiting
如果等待时间还不够或前一个需要执行的任务还在执行,则当前线程直接等待
if(nanos < delay || leader != null)
    nanos = available.awaitNanos(nanos);
else{否则当前线程可以执行(leader线程),但需要awaitNanos delay的时间才能执行
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
当等待时间到了之后就  leader = null说明此时可以返回finishPoll方法的结果
if(leader == thisThread)
    leader = null;
}
}
}
}
} finally {
if(leader == null && queue[0]!= null)
    available.signal();
lock.unlock();
}
}
  1. DelayedWorkQueue的poll方法也是使用reentrantLock来保证线程安全,然后使用condition.awaitNanos来达到等待特定时间的效果,这里使用leader线程保证了排在第一位的任务只有一个工作线程获取到,其他工作线程进行排队等待,在获取到第一个任务的工作线程delay时间到了之后会take到这个任务并signal排队的第一个工作线程继续获取下一个任务,周而复始。
  2. 在使用finishPoll方法返回delay时间到了的任务时会用siftDown对queue后半部分的任务进行排序,因为之前offer时使用siftUp方法只对queue前半部分进行了排序
  3. 回到ScheduledThreadPool线程池,keepAliveTime是0,所以当first任务的delay时间还没有到时会直接返回null,然后非核心工作线程就会直接销毁,之后的代码都不会执行,而核心线程则执行的take方法,take方法才会进入下面这段逻辑
if (leader != null)
      available.await();
  else {
    Thread thisThread = Thread.currentThread();
   leader = thisThread;
   try {
  available.awaitNanos(delay);
   } finally {
  if (leader == thisThread)
       leader = null;
       }
 }

scheduleAtFixedRate方法

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  long initialDelay,
  long period,
  TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
  null,
  triggerTime(initialDelay, unit),
  unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
 
  1. 这个方法跟之前schedule方法差不多了,都是使用了ScheduledFutureTask,这边多了个period变量保存执行周期值,outerTask引用了自身的对象,然后也是使用delayExecute方法把任务放入了queue中,此时任务的delay是initialDelay,所以会在initialDelay时间之后出队然后执行
  2. 由于现在工作线程中的task是ScheduledFutureTask,所以工作线程调用的task.run方法是ScheduledFutureTask.run方法

ScheduledFutureTask.run方法

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
cancel(false);
    else if (!periodic)
ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
    }
}
 

1.判断是不是周期执行的任务,之前的schedule方法的period是0,所以会执行super.run();然后执行传入的runnable中的run方法,而scheduleAtFixedRate方法的period不是0,则会执行super.runAndReset();方法,执行传入的runnable中的run方法之后执行setNextRunTime();

重新设置delay时间(initialDelay+period),然后把任务又放入queue中

scheduleWithFixedDelay方法

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 long initialDelay,
 long delay,
 TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
  null,
  triggerTime(initialDelay, unit),
  unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
 

这个方法几乎跟scheduleAtFixedRate方法一模一样,区别在于period是个负数,通过之前我们对scheduleAtFixedRate方法的分析,period这个参数在算周期执行间隔时会用到,也就是setNextRunTime方法

setNextRunTime方法

private void setNextRunTime() {
    long p = period;
    if (p > 0)
time += p;
    else
time = triggerTime(-p);
}
 

当period大于0时,也就是scheduleAtFixedRate执行时,是直接在之前的time加上了period,而scheduleWithFixedDelay方法执行时,是用triggerTime方法在当前时间加上了periode,不同的计算方式的区别在于,scheduleAtFixedRate不会管任务的执行时间,我只要保证任务固定频率执行就好了,所以他是几乎精确的period时间执行,而scheduleWithFixedDelay是在任务之后的时间+period时间来确定下一次任务执行的时间,所以任务执行的频率相对来说不固定

到此这篇关于Java多线程之scheduledThreadPool的方法解析的文章就介绍到这了,更多相关scheduledThreadPool的方法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java 汉字获取拼音或首字母工具类代码分析

    Java 汉字获取拼音或首字母工具类代码分析

    在本篇内容里小编给大家分享的是一篇关于Java 汉字获取拼音或首字母工具类知识点内容,有需要的朋友们可以学习参考下。
    2021-06-06
  • java结合WebSphere MQ实现接收队列文件功能

    java结合WebSphere MQ实现接收队列文件功能

    WebSphereMQ,也称MQSeries,以一致的、可靠的和易于管理的方式来连接应用程序,并为跨部门、企业范围的集成提供了可靠的基础。通过为重要的消息和事务提供可靠的、一次且仅一次的传递,MQ可以处理复杂的通信协议,并动态地将消息传递工作负载分配给可用的资源。
    2015-10-10
  • springboot jackson自定义序列化和反序列化实例

    springboot jackson自定义序列化和反序列化实例

    这篇文章主要介绍了spring boot jackson自定义序列化和反序列化实例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • Spring Boot Actuator监控端点小结

    Spring Boot Actuator监控端点小结

    这篇文章主要介绍了Spring Boot Actuator监控端点小结,需要的朋友可以参考下
    2017-06-06
  • java POI 如何实现Excel单元格内容换行

    java POI 如何实现Excel单元格内容换行

    这篇文章主要介绍了java POI 如何实现Excel单元格内容换行的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Spring 4 支持的 Java 8 特性

    Spring 4 支持的 Java 8 特性

    Spring 框架 4 支持 Java 8 语言和 API 功能。在本文中,我们将重点放在 Spring 4 支持新的 Java 8 的功能。最重要的是 Lambda 表达式,方法引用,JSR-310的日期和时间,和可重复注释。下面跟着小编一起来看下吧
    2017-03-03
  • idea启动Tomcat时控制台乱码的解决方法(亲测有效)

    idea启动Tomcat时控制台乱码的解决方法(亲测有效)

    最近在idea中启动tomcat出现控制台乱码问题,尝试了很多方法,最后终于解决了,所以下面这篇文章主要给大家介绍了关于idea启动Tomcat时控制台乱码的解决方法,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • SpringBoot配置MongoDB多数据源的方法步骤

    SpringBoot配置MongoDB多数据源的方法步骤

    这篇文章主要介绍了SpringBoot配置MongoDB多数据源的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • Springboot 过滤器、拦截器、全局异常处理的方案处理小结

    Springboot 过滤器、拦截器、全局异常处理的方案处理小结

    拦截器是一种动态拦截方法调用的机制,与过滤器类似,是Spring框架中提供的,用来动态拦截控制器方法的执行,这篇文章主要介绍了Springboot 过滤器、拦截器、全局异常处理,需要的朋友可以参考下
    2024-03-03
  • SpringCloud Eureka服务治理之服务注册服务发现

    SpringCloud Eureka服务治理之服务注册服务发现

    这篇文章主要介绍了SpringCloud Eureka服务治理服务注册和服务发现概念详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08

最新评论