使用Java实现一个简单的定时器

 更新时间:2024年12月15日 10:31:34   作者:JWASX  
这篇文章主要为大家详细介绍了如何使用Java实现一个简单的延时/定时器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

1. 概要

说到定时任务,大家都不会模式,前几天在看到 Java 的 Timer 定时(延时)任务,就顺着 Timer - ScheduledThreadPoolExecutor - RocketMQ - Netty 这些框架看了下定时任务的一些核心逻辑实现,现在就写一个系列的文章来记录下这些源码的实现过程,如果有错误的地方欢迎指出。

定时任务在我们平时的项目中都会多多少少有接触,在很多场景下都非常有用,比如:

  • 每天批量处理数据、生成报告等
  • 会议提醒、邮件发送等
  • 订单支付超时退款、清理临时文件、日志文件…

下面在介绍切入正题之前我们先自己实现一个最简单的定时/延时器

2. 注意问题

要实现一个定时任务,首先我们要考虑以下几点问题

  • 如何让任务按照执行的时间排序(什么样的集合)
  • 定时任务到期之后如何执行
  • 如何实现任务执行多次

带着这三个问题,我们来考虑定时器的实现

首先是第一个问题,什么样的集合能满足我们的要求,需要满足按照固定顺序自动排序调整任务,用 JDK 的 PriorityQueue 优先队列就可以了,只需要设置按照任务的执行时间排序,当我们把任务添加到队列里面,队列就会按照任务的执行时间自动排序,这部分代码就不用我们考虑了

第二个问题,我们可以额外启动一个任务处理线程专门去循环队列处理里面的到期任务,如果任务没到期就阻塞等待,如果到期了就唤醒执行任务

第三个问题,添加任务的时候可以设置一个 Count 表示任务需要执行的次数,添加任务的时候就可以 for 循环去遍历添加

3. 实现

3.1 任务定义

首先定义一个任务 bean,里面包括任务的执行时间和执行的任务:

class Task {
	// 执行时间
	private long executeTime;
	
	// 任务
    Runnable runnable;

    public Task(long executeTime, Runnable runnable) {
        this.executeTime = executeTime;
        this.runnable = runnable;
    }

    public long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(long executeTime) {
        this.executeTime = executeTime;
    }

    public Runnable getRunnable() {
        return runnable;
    }

    public void setRunnable(Runnable runnable) {
        this.runnable = runnable;
    }
}

3.2 添加任务的方法

public boolean add(long addTime, Runnable runnable, int count) {
    long now = System.currentTimeMillis();
    synchronized (queue) {
        for (int i = 0; i < count; i++) {
            // 计算任务的执行时间
            long newTime = now + addTime * (i + 1);
            // 如果小于 now,说明溢出了
            if (newTime < now) {
                throw new RuntimeException(Thread.currentThread().getName() + ": overflow long limit");
            }
            Task min = getMin();
            if (min == null || newTime < min.executeTime) {
                // 加入任务队列
                queue.offer(new Task(newTime, runnable));
                // 唤醒任务线程
                queue.notifyAll();
                continue;
            }
            // 加入任务队列
            queue.offer(new Task(newTime, runnable));
        }
    }
    return true;
}

首先计算任务的执行时间,判断一下是否会溢出,如果溢出就抛出异常

再获取队首任务,如果添加的新任务执行时间比队首任务更块,调用 queue.notifyAll() 唤醒线程去执行任务

将任务加入队列中

3.3 任务线程

Thread thread = new Thread(() -> {
   System.out.println(Thread.currentThread().getName() + ": 启动 execute-job-thread 线程");
     while (true) {
         try {
             synchronized (queue) {
                 if (queue.isEmpty()) {
                     this.queue.wait();
                 }
                 long now = System.currentTimeMillis();
                 while (!queue.isEmpty() && now >= queue.peek().getExecuteTime()) {
                     Task task = queue.poll();
                     // 线程池执行任务
                     executor.execute(task.runnable);
                 }
                 if (queue.isEmpty()) {
                     this.queue.wait();
                 } else {
                     long div = queue.peek().executeTime - now;
                     this.queue.wait(div);
                 }
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 }, "execute-job-thread");

线程在一个死循环里面不断遍历队列看有没有任务值

  • 如果没有任务,就阻塞等待
  • 被唤醒后代表有任务,这时候获取当前时间,获取所有的过期的任务依次执行
  • 执行完之后根据队列中是否还有任务来判断要阻塞多长时间

3.4 整体代码

PQueueService 代码如下:

public class PQueueService {

    private static final int MAX_COUNT = Integer.MAX_VALUE;

    private final PriorityQueue<Task> queue = new PriorityQueue<>((o1, o2) -> {
        return Long.compare(o1.executeTime, o2.executeTime);
    });

    static final ThreadPoolExecutor executor =
            new ThreadPoolExecutor(16, 32, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16));

    Thread thread = new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + ": 启动 execute-job-thread 线程");
        while (true) {
            try {
                synchronized (queue) {
                    if (queue.isEmpty()) {
                        this.queue.wait();
                    }
                    long now = System.currentTimeMillis();
                    while (!queue.isEmpty() && now >= queue.peek().getExecuteTime()) {
                        Task task = queue.poll();
                        // 线程池执行任务
                        executor.execute(task.runnable);
                    }
                    if (queue.isEmpty()) {
                        this.queue.wait();
                    } else {
                        long div = queue.peek().executeTime - now;
                        this.queue.wait(div);
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }, "execute-job-thread");

    public void init() {
        thread.start();
    }

    public Task getMin() {
        if (queue.isEmpty()) {
            return null;
        }
        synchronized (queue) {
            return queue.peek();
        }
    }

    public Task pollMin() {
        if (queue.isEmpty()) {
            return null;
        }
        synchronized (queue) {
            return queue.poll();
        }
    }

    public boolean add(long addTime, Runnable runnable, int count) {
        long now = System.currentTimeMillis();
        synchronized (queue) {
            for (int i = 0; i < count; i++) {
                long newTime = now + addTime * (i + 1);
                if (newTime < now) {
                    throw new RuntimeException(Thread.currentThread().getName() + ": overflow long limit");
                }
                Task min = getMin();
                if (min == null || newTime < min.executeTime) {
                    // 加入任务队列
                    queue.offer(new Task(newTime, runnable));
                    // 唤醒任务线程
                    queue.notifyAll();
                    continue;
                }
                // 加入任务队列
                queue.offer(new Task(newTime, runnable));
            }
        }
        return true;
    }


    class Task {
        private long executeTime;
        Runnable runnable;

        public Task(long executeTime, Runnable runnable) {
            this.executeTime = executeTime;
            this.runnable = runnable;
        }

        public long getExecuteTime() {
            return executeTime;
        }

        public void setExecuteTime(long executeTime) {
            this.executeTime = executeTime;
        }

        public Runnable getRunnable() {
            return runnable;
        }

        public void setRunnable(Runnable runnable) {
            this.runnable = runnable;
        }
    }
}

Timer 类代码如下:

public class Timer {

    public static final Logger logger = LoggerFactory.getLogger(Timer.class);

    PQueueService pQueueService;

    public Timer() {
        pQueueService = new PQueueService();
        pQueueService.init();
    }

    /**
     * 执行任务
     */
    public void fixExecuteOne(int seconds, TimeUnit timeUnit, Runnable runnable, int count) {
        if (timeUnit.ordinal() != TimeUnit.SECONDS.ordinal()) {
            throw new RuntimeException("unknown unit");
        }
        long time = seconds * 1000L;
        pQueueService.add(time, runnable, count);
    }

    public static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        Timer timer = new Timer();
        // 1.添加一个10s后执行的任务
        timer.fixExecuteOne(10, TimeUnit.SECONDS, () -> {
            System.out.println("10s任务 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date()));
        }, 2);
        // 2.添加一个3s后执行的任务
        timer.fixExecuteOne(3, TimeUnit.SECONDS, () -> {
            System.out.println("3s任务 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date()));
        }, 2);
        // 3.添加一个20s后执行的任务
        timer.fixExecuteOne(20, TimeUnit.SECONDS, () -> {
            System.out.println("20s任务 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date()));
        }, 2);
        // 4.添加一个1s后执行的任务
        timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> {
            System.out.println("1s任务1 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date()));
        }, 2);
        // 5.添加一个1s后执行的任务
        timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> {
            System.out.println("1s任务2 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date()));
        }, 2);
        // 6.添加一个1s后执行的任务
        timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> {
            System.out.println("1s任务3 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date()));
        }, 2);
        try {
            Thread.sleep(30000);
            // 7.添加一个1s后执行的任务
            timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> {
                System.out.println("1s任务3 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date()));
            }, 1);
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

执行结果:

execute-job-thread: 启动 execute-job-thread 线程
1s任务2 ===> pool-1-thread-2: 执行了!!!!!, 当前时间:2024-11-23 21:17:21
1s任务1 ===> pool-1-thread-1: 执行了!!!!!, 当前时间:2024-11-23 21:17:21
1s任务3 ===> pool-1-thread-3: 执行了!!!!!, 当前时间:2024-11-23 21:17:21
1s任务1 ===> pool-1-thread-4: 执行了!!!!!, 当前时间:2024-11-23 21:17:22
1s任务2 ===> pool-1-thread-5: 执行了!!!!!, 当前时间:2024-11-23 21:17:22
1s任务3 ===> pool-1-thread-6: 执行了!!!!!, 当前时间:2024-11-23 21:17:22
3s任务 ===> pool-1-thread-7: 执行了!!!!!, 当前时间:2024-11-23 21:17:23
3s任务 ===> pool-1-thread-8: 执行了!!!!!, 当前时间:2024-11-23 21:17:26
10s任务 ===> pool-1-thread-9: 执行了!!!!!, 当前时间:2024-11-23 21:17:30
10s任务 ===> pool-1-thread-10: 执行了!!!!!, 当前时间:2024-11-23 21:17:40
20s任务 ===> pool-1-thread-11: 执行了!!!!!, 当前时间:2024-11-23 21:17:40
30s后添加的1s任务 ===> pool-1-thread-12: 执行了!!!!!, 当前时间:2024-11-23 21:17:51
20s任务 ===> pool-1-thread-13: 执行了!!!!!, 当前时间:2024-11-23 21:18:00

4. 存在问题

上面实现了一个简单的定时器,但是这个定时器还是存在下面一些问题:

  • 功能比较简单,没有其他的 remove 等操作
  • 任务线程拿到任务之后会丢到另外一个线程池中执行,如果线程池中任务比较多,就会阻塞导致执行时间不准
  • 代码中对 queue 进行了加锁,但是一个任务可以添加到多个 queue,这里没有做并发处理

总之,这就是一个简单的 Timer Demo,主要了解下定时器/延时器的执行过程, 但是其实 JDK 里面的定时器/延时器实现流程都大差不差,下一篇文章,我们就来看看 Timer 的用法和源码

到此这篇关于使用Java实现一个简单的定时器的文章就介绍到这了,更多相关Java定时器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中批处理框架spring batch详细介绍

    Java中批处理框架spring batch详细介绍

    这篇文章主要介绍了Java中批处理框架spring batch详细介绍,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • SpringCloud之loadbalancer负载均衡组件实战详解

    SpringCloud之loadbalancer负载均衡组件实战详解

    LoadBalancer是Spring Cloud官方提供的负载均衡组件,可用于替代Ribbon,这篇文章主要介绍了SpringCloud之loadbalancer负载均衡组件,需要的朋友可以参考下
    2023-06-06
  • Java实时获取基金收益项目源码分享

    Java实时获取基金收益项目源码分享

    这篇文章主要介绍了Java实时获取基金收益项目源码分享,主要包括JAVA爬取天天基金网数据使用实例、应用技巧、基本知识点总结和需要注意事项,需要的朋友可以参考下
    2021-03-03
  • 如何使用IDEA开发Spark SQL程序(一文搞懂)

    如何使用IDEA开发Spark SQL程序(一文搞懂)

    Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。这篇文章主要介绍了如何使用IDEA开发Spark SQL程序(一文搞懂),需要的朋友可以参考下
    2021-08-08
  • 如何使用Collections.reverse对list集合进行降序排序

    如何使用Collections.reverse对list集合进行降序排序

    这篇文章主要介绍了Java使用Collections.reverse对list集合进行降序排序,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • JVM原理之完整的一次GC流程解读

    JVM原理之完整的一次GC流程解读

    这篇文章主要介绍了JVM原理之完整的一次GC流程解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • java web开发中获取tomcat上properties文件内容的方法

    java web开发中获取tomcat上properties文件内容的方法

    java web开发中如何获取tomcat上properties文件内容的方法,方便文件存储位置的修改,解耦和,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-07-07
  • 解决Feign切换client到okhttp无法生效的坑(出现原因说明)

    解决Feign切换client到okhttp无法生效的坑(出现原因说明)

    这篇文章主要介绍了解决Feign切换client到okhttp无法生效的坑(出现原因说明),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • Java数组(Array)最全汇总(下篇)

    Java数组(Array)最全汇总(下篇)

    这篇文章主要介绍了Java数组(Array)最全汇总(下篇),本文章内容详细,通过案例可以更好的理解数组的相关知识,本模块分为了三部分,本次为下篇,需要的朋友可以参考下
    2023-01-01
  • Java基础之从HelloWorld到面向对象

    Java基础之从HelloWorld到面向对象

    Java是完全面向对象的语言。Java通过虚拟机的运行机制,实现“跨平台”的理念。本文为通过呈现一个适合初学者的教程,希望对大家有用
    2021-09-09

最新评论