使用ScheduledThreadPoolExecutor踩过最痛的坑

 更新时间:2023年08月10日 16:57:39   作者:肥肥技术宅  
这篇文章主要介绍了使用ScheduledThreadPoolExecutor踩过最痛的坑及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

概述

最近项目上反馈某个重要的定时任务突然不执行了,很头疼,开发环境和测试环境都没有出现过这个问题。

定时任务采用的是 ScheduledThreadPoolExecutor ,后来一看代码发现踩了一个大坑....

还原"大坑"

这个坑就是如果 ScheduledThreadPoolExecutor 中执行的任务出错抛出异常后,不仅不会打印异常堆栈信息,同时还会取消后面的调度 

直接看例子

@Test
public void testException() throws InterruptedException {
    // 创建1个线程的调度任务线程池
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    // 创建一个任务
    Runnable runnable = new Runnable() {
        volatile int num = 0;
        @Override
        public void run() {
            num ++;
            // 模拟执行报错
            if(num > 5) {
                throw new RuntimeException("执行错误");
            }
            log.info("exec num: [{}].....", num);
        }
    };
    // 每隔1秒钟执行一次任务
    scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
    Thread.sleep(10000);
}

运行结果:

  • 只执行了5次后,就不打印,不执行了,因为报错了
  • 任务报错,也没有打印一次堆栈,更导致调度任务取消,后果十分严重。

解决方案

解决方法也非常简单,只要通过try catch捕获异常即可。

运行结果:

看到不仅打印了异常堆栈,而且也会进行周期性的调度。

更推荐的做法

更好的建议可以在自己的项目中封装一个包装类,要求所有的调度都提交通过我们统一的包装类, 如下代码:

@Slf4j
public class RunnableWrapper implements Runnable {
    // 实际要执行的线程任务
    private Runnable task;
    // 线程任务被创建出来的时间
    private long createTime;
    // 线程任务被线程池运行的开始时间
    private long startTime;
    // 线程任务被线程池运行的结束时间
    private long endTime;
    // 线程信息
    private String taskInfo;
    private boolean showWaitLog;
    /**
     * 执行间隔时间多久,打印日志
     */
    private long durMs = 1000L;
    // 当这个任务被创建出来的时候,就会设置他的创建时间
    // 但是接下来有可能这个任务提交到线程池后,会进入线程池的队列排队
    public RunnableWrapper(Runnable task, String taskInfo) {
        this.task = task;
        this.taskInfo = taskInfo;
        this.createTime = System.currentTimeMillis();
    }
    public void setShowWaitLog(boolean showWaitLog) {
        this.showWaitLog = showWaitLog;
    }
    public void setDurMs(long durMs) {
        this.durMs = durMs;
    }
    // 当任务在线程池排队的时候,这个run方法是不会被运行的
    // 但是当任务结束了排队,得到线程池运行机会的时候,这个方法会被调用
    // 此时就可以设置线程任务的开始运行时间
    @Override
    public void run() {
        this.startTime = System.currentTimeMillis();
        // 此处可以通过调用监控系统的API,实现监控指标上报
        // 用线程任务的startTime-createTime,其实就是任务排队时间
        // 这边打印日志输出,也可以输出到监控系统中
        if(showWaitLog) {
            log.info("任务信息: [{}], 任务排队时间: [{}]ms", taskInfo, startTime - createTime);
        }
        // 接着可以调用包装的实际任务的run方法
        try {
            task.run();
        } catch (Exception e) {
            log.error("run task error", e);
            throw e;
        }
        // 任务运行完毕以后,会设置任务运行结束的时间
        this.endTime = System.currentTimeMillis();
        // 此处可以通过调用监控系统的API,实现监控指标上报
        // 用线程任务的endTime - startTime,其实就是任务运行时间
        // 这边打印任务执行时间,也可以输出到监控系统中
        if(endTime - startTime > durMs) {
            log.info("任务信息: [{}], 任务执行时间: [{}]ms", taskInfo, endTime - startTime);
        }
    }
}

使用:

我们还可以在包装类里面封装各种监控行为,如本例打印日志执行时间等。

原理探究

那大家有没有想过为什么任务出错会导致异常无法打印,甚至调度都取消了呢?让我们从源码出发,一探究竟。

下面是调度任务的入口方法

// ScheduledThreadPoolExecutor#scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 将执行任务和参数包装成ScheduledFutureTask对象
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 延迟执行
    delayedExecute(t);
    return t;
}
复制代码

这个方法主要做了两个事情

  • 将执行任务和参数包装成ScheduledFutureTask对象
  • 调用delayedExecute方法延迟执行任务

延迟或周期性任务的主要执行方法, 主要是将任务丢到队列中,后续由工作线程获取执行。

// ScheduledThreadPoolExecutor#delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            // 将任务丢到阻塞队列中
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 开启工作线程,去执行任务,或者从队列中获取任务执行
                ensurePrestart();
        }
    }

现在任务已经在队列中了,我们看下任务执行的内容是什么,还记得前面的包装对象 ScheduledFutureTask 类,它的实现类是 ScheduledFutureTask ,继承了Runnable类。

// ScheduledFutureTask#run方法
public void run() {
    // 是不是周期性任务
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 不是周期性任务的话, 直接调用一次下面的run    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性任务,则调用runAndReset方法,如果返回true,继续执行
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次调度时间
        setNextRunTime();
        // 重新执行调度任务
        reExecutePeriodic(outerTask);
    }
}

这里的关键就是看 ScheduledFutureTask.super.runAndReset() 方法是否返回true,如果是true的话继续调度。

runAndReset方法也很简单,关键就是看报异常如何处理。

// FutureTask#runAndReset
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    // 是否继续下次调度,默认false
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 执行任务
                c.call(); 
                // 执行成功的话,设置为true
                ran = true;
                // 异常处理,关键点
            } catch (Throwable ex) {
                // 不会修改ran的值,最终是false,同时也不打印异常堆栈
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 返回结果
    return ran && s == NEW;
}
  • 关键点ran变量,最终返回是不是下次继续调度执行
  • 如果抛出异常的话,可以看到不会修改ran为true。

总结

Java的ScheduledThreadPoolExecutor定时任务线程池所调度的任务中如果抛出了异常,并且异常没有捕获直接抛到框架中,会导致ScheduledThreadPoolExecutor定时任务不调度了。

这个结论希望大家一定要记住,不然非常坑,关键是有时候测试环境、开发环境还无法复现,有一定的随机性,真的到了生产就完蛋了。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

关于这些知识点,我们不仅要知其然,还要知其所以然,这样才会记忆深刻,不然很容易遗忘。

相关文章

  • Spring Boot整合JWT的实现步骤

    Spring Boot整合JWT的实现步骤

    本文主要介绍了Spring Boot整合JWT,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • Hadoop2.8.1完全分布式环境搭建过程

    Hadoop2.8.1完全分布式环境搭建过程

    本文搭建了一个由三节点(master、slave1、slave2)构成的Hadoop完全分布式集群(区别单节点伪分布式集群),并通过Hadoop分布式计算的一个示例测试集群的正确性。对hadoop分布式环境搭建过程感兴趣的朋友跟随小编一起看看吧
    2019-06-06
  • java如何创建一个jdbc程序详解

    java如何创建一个jdbc程序详解

    使用Java程序来操作数据库,后者更加直接的话就是使用Java程序来发送SQL语句的技术称之为:JDBC。下面这篇文章主要给大家介绍了关于利用java如何创建一个jdbc程序的相关资料,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-11-11
  • Java日常练习题,每天进步一点点(28)

    Java日常练习题,每天进步一点点(28)

    下面小编就为大家带来一篇Java基础的几道练习题(分享)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧,希望可以帮到你
    2021-07-07
  • Springboot整合企业微信机器人助手推送消息的实现

    Springboot整合企业微信机器人助手推送消息的实现

    本文主要介绍了Springboot整合企业微信机器人助手推送消息的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05
  • Java实现多个单张tif文件合并成一个多页tif文件

    Java实现多个单张tif文件合并成一个多页tif文件

    业务部门需要将多个单张的tiff文件,合并成一个多页的tiff文件,本文就来介绍一下如何实现,具有一定的参考价值,感兴趣的可以了解一下
    2023-09-09
  • SpringBoot集成redis实现共享存储session

    SpringBoot集成redis实现共享存储session

    这篇文章主要介绍了SpringBoot集成redis实现共享存储session的流程步骤,文中通过代码示例介绍的非常详细,并总结了一些常见的错误及解决方法,需要的朋友可以参考下
    2024-03-03
  • Springboot整合Shiro之加盐MD5加密的方法

    Springboot整合Shiro之加盐MD5加密的方法

    这篇文章主要介绍了Springboot整合Shiro之加盐MD5加密的方法,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-12-12
  • SpringBoot集成RocketMQ实现消息发送的三种方式

    SpringBoot集成RocketMQ实现消息发送的三种方式

    RocketMQ 支持3 种消息发送方式: 同步 (sync)、异步(async)、单向(oneway),本文就将给大家介绍一下SpringBoot集成RocketMQ实现消息发送的三种方式文中有详细的代码示例,需要的朋友可以参考下
    2023-09-09
  • Java类加载初始化的过程及顺序

    Java类加载初始化的过程及顺序

    今天小编就为大家分享一篇关于Java类加载初始化的过程及顺序,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-12-12

最新评论