Java中的延迟队列DelayQueue源码解析

 更新时间:2023年12月11日 08:31:10   作者:程光CS  
这篇文章主要介绍了Java中的延迟队列DelayQueue源码解析,DelayQueue是一个支持并发的无界延迟队列,队列中的每个元素都有个预定时间,当线程从队列获取元素时,只有到期元素才会出队列,没有到期元素则阻塞等待,需要的朋友可以参考下

一、什么是DelayQueue

DelayQueue是一个支持并发的无界延迟队列,队列中的每个元素都有个预定时间,当线程从队列获取元素时,只有到期元素才会出队列,没有到期元素则阻塞等待。

队列头元素是最快要到期的元素。因此DelayQueue可用于实现定时任务队列。

DelayQueue中的主要成员变量和方法如下:

在这里插入图片描述

q:使用优先队列PriorityQueue存储数据,队列中的元素需实现Delayed接口,实现getDelay()和compareTo()方法,以实现优先队列内部的优先级比较,剩余到期时间越短的元素优先级越高

public interface Delayed extends Comparable<Delayed> {
	//获取元素剩余到期时间
    long getDelay(TimeUnit unit);
}

lock:使用ReentrantLock对插入和读取队列元素的方法进行加锁,以实现多线程并发读写队列操作的同步。

available:用一个条件等待队列存放等待获取到期元素的线程。

leader:用于表示当前正在等待获取队头元素的线程,这里使用了一个Leader-Follower模式的变体,线程获取完元素后从等待队列中选择一个线程成为leader继续等待获取队头元素,以避免不必要的竞争消耗。

Leader-Follower模式 在并发IO中,当一个线程收到IO事件后,会考虑启动一个新的线程去处理,而自己继续等待下一个请求。但这里可能会有性能问题,就是把工作交给别一个线程的时候需上下文切换,包括数据拷贝。 而在Leader-Follower模式中所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己去处理这个事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。 

二、主要方法源码解析

1. offer()

插入元素到队列。首先获取锁,拿到锁后向优先队列中插入元素,若插入完毕后发现队头元素就是自己,即最近到期时间的元素就是自己,刷新了记录,那就赶紧从等待队列中通知一个线程准备来获取这个元素,然后释放锁。

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

这里为什么要将leader先置为null?

 因为如果此时leader线程在超时等待获取前任队头元素,而signal通知了另一个线程,看完take()的源码可以知道如果有leader线程,那么此线程会直接阻塞等待,让leader线程超时完后获取队头,那显然时间就不正确了,只有将leader设为null,后续线程才能成为leader并设置正确的超时时间来等待获取最新队头元素

因此,leader变量的真正含义是:超时等待获取队列最新队头元素的线程,等待的时间即为最新队头元素剩余到期时间 因此,当队头元素发生变动(插入/删除更新)时,就需要唤醒一个线程更新leader

2. take()

获取优先队列队头元素。首先获取锁,拿到锁后进入一个循环,首先检测队头元素,若为空则进入等待队列阻塞等待,若不为空且队头元素已到期则直接将其出队返回,如果还没到期就看有没有leader线程已经在准备获取队头元素了,如果有就不用抢了,进入等待队列阻塞等待,如果没有就超时等待准备获取队头元素,被唤醒后进入下一次循环获取队头元素。获取完毕后就从等待队列中通知一个线程到同步队列准备获取队头元素然后释放锁。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
        	//从优先堆中获取堆顶元素,即优先级最高,即预定时间最近的元素
            E first = q.peek();
            if (first == null)
				//若队列中无元素则直接进入条件队列等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                //若堆顶元素已经到期,则直接将其出队返回
                if (delay <= 0)
                    return q.poll();
				//等待期间不持有元素引用,防止该元素被其他线程出队消费后,仍不能被垃圾回收
                first = null; // don't retain ref while waiting
                if (leader != null)
                	//若已经有leader了,则进入条件队列无限期等待
                    available.await();
                else {
                	//否则成为leader进入条件队列超时等待,到预期时间或者有更近时间元素插入就到同步队列竞争锁,再重复循环去取堆顶元素
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
    	//取完元素后若leader为null且队列中还有元素则从条件等待队列通知一个线程到同步队列
    	//为什么存在leader不为null的情况:leader线程从awaitNanos()中结束后没有竞争过新进take()的线程,因此继续在同步队列中被阻塞,因此无需再从条件等待队列中通知线程,直接让leader线程再去竞争锁,
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();//释放锁资源让同步队列中的线程竞争锁
    }
}

Leader-Follower模式在这里的作用在于,在队头元素还没到期的情况下,只需要有一个线程(leader)超时等待,其余线程进来后发现已经有leader了,就直接无限等待就行了,避免了无意义的超时等待和竞争消耗。

3. poll()

加锁获取并移除队头过期元素,如果没有过期元素则不等待直接返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

4. size()

加锁获取队列当前剩余元素个数

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.size();
    } finally {
        lock.unlock();
    }
}

三、使用案例

如下使用案例,首先向DelayQueue插入5个定时任务,然后用3个线程并发读取

public class DelayQueueTest {
    //队列元素类
    static class DelayTask implements Delayed {
        long exeTime;//预定执行时间
        public DelayTask(long exeTime) {
            this.exeTime = exeTime;
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.exeTime - System.currentTimeMillis(), unit);
        }
        @Override
        public int compareTo(Delayed o) {
            long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
            return (int) delta;
        }
    }
    public static void main(String[] args) {
        DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
        for (int i = 1;i <= 5;i++) {
            delayQueue.offer(new DelayTask(System.currentTimeMillis() + new Random().nextInt(10)*1000));
        }
        for (int i = 1;i <= 3;i++) {
            new Thread(() -> {
                try {
                    while (true) {
                        DelayTask task = delayQueue.take();
                        System.out.printf("取出任务!取出时间:%s 任务预定执行时间:%s%n", hms(System.currentTimeMillis()), hms(task.exeTime));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    public static String hms(long milliseconds) {
        return new SimpleDateFormat("HH:mm:ss").format(milliseconds);
    }
}

运行结果:

取出任务!取出时间:10:27:39 任务预定执行时间:10:27:39
取出任务!取出时间:10:27:39 任务预定执行时间:10:27:39
取出任务!取出时间:10:27:40 任务预定执行时间:10:27:40
取出任务!取出时间:10:27:42 任务预定执行时间:10:27:42
取出任务!取出时间:10:27:46 任务预定执行时间:10:27:46

到此这篇关于Java中的延迟队列DelayQueue源码解析的文章就介绍到这了,更多相关Java延迟队列DelayQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot定时任务的实现详解

    SpringBoot定时任务的实现详解

    这篇文章主要介绍了SpringBoot定时任务的实现详解,定时任务是企业级开发中最常见的功能之一,如定时统计订单数、数据库备份、定时发送短信和邮件、定时统计博客访客等,简单的定时任务可以直接通过Spring中的@Scheduled注解来实现,需要的朋友可以参考下
    2024-01-01
  • 一分钟掌握Java ElasticJob分布式定时任务

    一分钟掌握Java ElasticJob分布式定时任务

    ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案,本文主要通过简单的示例带大家深入了解ElasticJob分布式定时任务的相关知识,需要的可以参考一下
    2023-05-05
  • Java this 关键字的使用方法详解

    Java this 关键字的使用方法详解

    这篇文章主要介绍了Java this 关键字的使用方法详解的相关资料,希望通过本文能帮助到大家,让大家彻底理解掌握这部分内容,需要的朋友可以参考下
    2017-10-10
  • Mybatis批量插入返回插入成功后的主键id操作

    Mybatis批量插入返回插入成功后的主键id操作

    这篇文章主要介绍了Mybatis批量插入返回插入成功后的主键id操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03
  • 怎么运行用记事本写的java程序

    怎么运行用记事本写的java程序

    以下小编就为大家介绍一下,怎么运行用记事本写的java程序。需要的朋友可以过来参考下
    2013-08-08
  • Java网络编程之UDP网络通信详解

    Java网络编程之UDP网络通信详解

    这篇文章主要为大家详细介绍了Java网络编程中的UDP网络通信的原理与实现,文中的示例代码讲解详细,具有一定的借鉴价值,需要的可以参考一下
    2022-09-09
  • 带你了解Java中的异常处理(下)

    带你了解Java中的异常处理(下)

    这篇文章主要介绍了Java中的异常处理的相关资料,帮助大家更好的理解和学习Java,感兴趣的朋友可以了解下
    2020-08-08
  • 浅析Java编程中枚举类型的定义与使用

    浅析Java编程中枚举类型的定义与使用

    这篇文章主要介绍了Java编程中枚举类型的定义与使用,简单讲解了enum关键字与枚举类的用法,需要的朋友可以参考下
    2016-05-05
  • Java基础之八大排序算法

    Java基础之八大排序算法

    这篇文章主要介绍了Java基础之八大排序算法,文中有非常详细的代码示例,对正在学习java基础的小伙伴们有非常好的帮助,需要的朋友可以参考下
    2021-04-04
  • java.exe和javaw.exe的区别及使用方法

    java.exe和javaw.exe的区别及使用方法

    这篇文章主要介绍了java.exe和javaw.exe的区别及使用方法,需要的朋友可以参考下
    2014-04-04

最新评论