关于ScheduledThreadPoolExecutor不执行的原因分析

 更新时间:2023年08月10日 14:46:32   作者:Redick01  
这篇文章主要介绍了关于ScheduledThreadPoolExecutor不执行的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

ScheduledThreadPoolExecutor不执行原因分析

最近在调试一个监控应用指标的时候发现定时器在服务启动执行一次之后就不执行了,这里用的定时器是Java的调度线程池 ScheduledThreadPoolExecutor ,后来经过排查发现 ScheduledThreadPoolExecutor 线程池处理任务如果抛出异常,会导致线程池不调度;下面就通过一个例子简单分析下为什么异常会导致 ScheduledThreadPoolExecutor 不执行。

ScheduledThreadPoolExecutor不调度分析

示例程序

在示例程序可以看到当计数器中的计数达到5的时候就会主动抛出一个异常,抛出异常后 ScheduledThreadPoolExecutor 就不调度了。

public class ScheduledTask {
    private static final AtomicInteger count = new AtomicInteger(0);
    private static final ScheduledThreadPoolExecutor SCHEDULED_TASK = new ScheduledThreadPoolExecutor(
            1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "sc-task");
            t.setDaemon(true);
            return t;
        }
    });
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        SCHEDULED_TASK.scheduleWithFixedDelay(() -> {
            System.out.println(111);
            if (count.get() == 5) {
                throw new IllegalArgumentException("my exception");
            }
            count.incrementAndGet();
        }, 0, 5, TimeUnit.SECONDS);
        latch.await();
    }
}

源码分析

  • ScheduledThreadPoolExecutor#run

run方法内部首先判断任务是不是周期性的任务,如果不是周期性任务通过 ScheduledFutureTask.super.run(); 执行任务;如果状态是运行中或shutdown,取消任务执行;如果是周期性的任务,通过 ScheduledFutureTask.super.runAndReset() 执行任务并且重新设置状态,成功了就会执行 setNextRunTime 设置下次调度的时间,问题就是出现在 ScheduledFutureTask.super.runAndReset() ,这里执行任务出现了异常,导致结果为false,就不进行下次调度时间设置等

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
  • *FutureTask#runAndReset

在线程任务执行过程中抛出异常,然后 catch 到了异常,最终导致这个方法返回false,然后 ScheduledThreadPoolExecutor#run 就不设置下次执行时间了,代码 c.call(); 抛出异常,跳过 ran = true; 代码,最终 runAndReset 返回false。

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

注意:

Java ScheduledThreadPoolExecutor 定时任务线程池所调度的任务中如果抛出了异常,并且异常没有捕获直接抛到框架中,会导致 ScheduledThreadPoolExecutor 定时任务不调度了,具体是因为当异常抛到 ScheduledThreadPoolExecutor 框架中时不进行下次调度时间的设置,从而导致 ScheduledThreadPoolExecutor 定时任务不调度。

ScheduledThreadPoolExecutor线程池例子

ScheduledThreadPoolExecutor 使用

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,主要用来给定时间运行任务,或者定期执行任务。

在 ScheduledThreadPoolExecutor 的实现中,使用了 FutureTask 运行任务以及使用无界队列 DelayedWorkQueue 来保存任务。

1. 使用示例

  • 提交任务

ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService 接口,其中,接口中有四个需要实现的方法,其中 schedule() 的两个方法需要设置任务以及任务启动的延迟时间,scheduleAtFixedRate() 可以设置任务定时重复执行,scheduleWithFixedDelay() 则是设置两个任务之间的执行延迟时间。

ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2);  
poolExecutor.schedule(() -> {  
    // 提交的任务
}, 5, TimeUnit.HOURS);  
poolExecutor.scheduleAtFixedRate(() -> {  
    // 提交的任务
}, 0, 5, TimeUnit.HOURS);  
poolExecutor.scheduleWithFixedDelay(() -> {  
    // 提交的任务
}, 0, 5, TimeUnit.HOURS);
  • 简单例子

下面的例子中每 500 毫秒打印一次字符串,executor 会有 5 秒的时间来等待任务执行结束,也就是一共可以打印 10 次字符串。

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);  
executor.scheduleWithFixedDelay(() -> {  
    System.out.println("测试");  
}, 0, 500, TimeUnit.MILLISECONDS);  
try {  
    executor.awaitTermination(5, TimeUnit.SECONDS);  
    executor.shutdown();  
} catch (InterruptedException e) {  
    e.printStackTrace();  
}

ScheduledThreadPoolExecutor 原理

1. DelayedWorkQueue

ScheduledThreadPoolExecutor 的构造方法对 DelayedWorkQueue 进行了初始化,并且最大线程数量设置成了 Integer.MAX_VALUE。

public ScheduledThreadPoolExecutor(int corePoolSize) {  
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  
          new DelayedWorkQueue());  
}

其中,DelayedWorkQueue 中的队列是 RunnableScheduledFuture 类型以及容量为 16 的数组。

private static final int INITIAL_CAPACITY = 16;  
private RunnableScheduledFuture<?>[] queue =  
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];  
private final ReentrantLock lock = new ReentrantLock();

队列添加任务是以 DelayedWorkQueue 以堆作为数据结构存储任务,在添加元素的时候,会使用基于 Siftup 版本进行元素添加,并且会根据任务的执行时间的大小来排序。

public boolean offer(Runnable x) {  
    if (x == null)  
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;  
    final ReentrantLock lock = this.lock;  
    lock.lock();
    try {
        // 当队列的容量不够,会扩充 50%
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {  
            siftUp(i, e);  
        }  
        if (queue[0] == e) {  
            leader = null;  
            available.signal();  
        }  
    } finally {  
        lock.unlock();  
    }  
    return true;  
}

2. delayedExecute()

ScheduledExecutorService 接口的四个实现方法中都涉及到了 delayedExecute(),方法主要用来判断线程池的状态以及对线程进行初始化。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果线程池关闭了,需要执行饱和策略
    if (isShutdown())  
        reject(task);  
    else {
        // 添加到队列中
        super.getQueue().add(task);  
        if (isShutdown() &&  
            !canRunInCurrentRunState(task.isPeriodic()) &&  
            remove(task))  
            task.cancel(false);  
        else
            // 判断等待队列中是否已经满了,会使用到 ThreadPoolExecutor
            ensurePrestart();  
    }  
}
void ensurePrestart() {  
    int wc = workerCountOf(ctl.get());  
    if (wc < corePoolSize)  
        addWorker(null, true);  
    else if (wc == 0)  
        addWorker(null, false);  
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • LCN分布式事务解决方案详解

    LCN分布式事务解决方案详解

    这篇文章主要介绍了LCN分布式事务解决方案详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • Spring中@PathVariable和@RequestParam注解的用法区别

    Spring中@PathVariable和@RequestParam注解的用法区别

    这篇文章主要介绍了Spring中@PathVariable和@RequestParam注解的用法区别,@PathVariable 是 Spring 框架中的一个注解,用于将 URL 中的变量绑定到方法的参数上,它通常用于处理 RESTful 风格的请求,从 URL 中提取参数值,并将其传递给方法进行处理,需要的朋友可以参考下
    2024-01-01
  • java文件如何统计字母出现的次数和百分比

    java文件如何统计字母出现的次数和百分比

    这篇文章主要介绍了java文件如何统计字母出现的次数和百分比,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • 超详细的Intellij IDEA 看源码必备技能

    超详细的Intellij IDEA 看源码必备技能

    这篇文章主要介绍了Intellij IDEA 看源码必备技能,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-04-04
  • 详解Java并发之Condition

    详解Java并发之Condition

    这篇文章主要介绍了Java并发编程之Condition,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • Java中get/post的https请求忽略ssl证书认证浅析

    Java中get/post的https请求忽略ssl证书认证浅析

    因为Java在安装的时候,会默认导入某些根证书,所以有些网站不导入证书,也可以使用Java进行访问,这篇文章主要给大家介绍了关于Java中get/post的https请求忽略ssl证书认证的相关资料,需要的朋友可以参考下
    2024-01-01
  • java微信红包实现算法

    java微信红包实现算法

    这篇文章主要为大家详细介绍了java微信红包实现算法,列出红包的核心算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-02-02
  • 详解Java动态代理的实现机制

    详解Java动态代理的实现机制

    这篇文章主要为大家详细介绍了Java动态代理的实现机制,感兴趣的小伙伴们可以参考一下
    2016-03-03
  • java区分绝对路径和相对路径的方法

    java区分绝对路径和相对路径的方法

    这篇文章主要介绍了java区分绝对路径和相对路径的方法,实例分析了java针对路径操作的相关技巧,需要的朋友可以参考下
    2015-04-04
  • SpringBoot整合Web之CORS支持与配置类和 XML配置及注册拦截器

    SpringBoot整合Web之CORS支持与配置类和 XML配置及注册拦截器

    这篇文章主要介绍了SpringBoot整合Web开发中CORS支持与配置类和 XML配置及注册拦截器的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08

最新评论