Java延迟队列原理与用法实例详解
本文实例讲述了Java延迟队列原理与用法。分享给大家供大家参考,具体如下:
延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……
应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:
简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。
一、消息体
package com.delqueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期…… * * @author whd * @date 2017年9月24日 下午8:57:14 */ public class Message implements Delayed { private int id; private String body; // 消息内容 private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。 public int getId() { return id; } public String getBody() { return body; } public long getExcuteTime() { return excuteTime; } public Message(int id, String body, long delayTime) { this.id = id; this.body = body; this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); } // 自定义实现比较方法返回 1 0 -1三个参数 @Override public int compareTo(Delayed delayed) { Message msg = (Message) delayed; return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1 : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); } // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期 @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); } }
二、消息消费者
package com.delqueue; import java.util.concurrent.DelayQueue; public class Consumer implements Runnable { // 延时队列 ,消费者从其中获取消息进行消费 private DelayQueue<Message> queue; public Consumer(DelayQueue<Message> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Message take = queue.take(); System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
三、延时队列
package com.delqueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class DelayQueueTest { public static void main(String[] args) { // 创建延时队列 DelayQueue<Message> queue = new DelayQueue<Message>(); // 添加延时消息,m1 延时3s Message m1 = new Message(1, "world", 3000); // 添加延时消息,m2 延时10s Message m2 = new Message(2, "hello", 10000); //将延时消息放到延时队列中 queue.offer(m2); queue.offer(m1); // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间 ExecutorService exec = Executors.newFixedThreadPool(1); exec.execute(new Consumer(queue)); exec.shutdown(); } }
将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。
这就是延迟队列demo,下面我们来说说在真实环境下的使用。
使用场景描述:
在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……
下面看看简单的流程图:
下面来看看具体代码实现:
在项目中有如下几个类:第一 、任务类 第二、按照任务类组装的消息体类 第三、延迟队列管理类
任务类即执行筛选司机、绑单、push消息的任务类
package com.test.delayqueue; /** * 具体执行相关业务的业务类 * @author whd * @date 2017年9月25日 上午12:49:32 */ public class DelayOrderWorker implements Runnable { @Override public void run() { // TODO Auto-generated method stub //相关业务逻辑处理 System.out.println(Thread.currentThread().getName()+" do something ……"); } }
消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的
这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体
package com.test.delayqueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 延时队列中的消息体将任务封装为消息体 * * @author whd * @date 2017年9月25日 上午12:48:30 * @param <T> */ public class DelayOrderTask<T extends Runnable> implements Delayed { private final long time; private final T task; // 任务类,也就是之前定义的任务类 /** * @param timeout * 超时时间(秒) * @param task * 任务 */ public DelayOrderTask(long timeout, T task) { this.time = System.nanoTime() + timeout; this.task = task; } @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub DelayOrderTask other = (DelayOrderTask) o; long diff = time - other.time; if (diff > 0) { return 1; } else if (diff < 0) { return -1; } else { return 0; } } @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int hashCode() { return task.hashCode(); } public T getTask() { return task; } }
延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务
package com.test.delayqueue; import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 延时队列管理类,用来添加任务、执行任务 * * @author whd * @date 2017年9月25日 上午12:44:59 */ public class DelayOrderQueueManager { private final static int DEFAULT_THREAD_NUM = 5; private static int thread_num = DEFAULT_THREAD_NUM; // 固定大小线程池 private ExecutorService executor; // 守护线程 private Thread daemonThread; // 延时队列 private DelayQueue<DelayOrderTask<?>> delayQueue; private static final AtomicLong atomic = new AtomicLong(0); private final long n = 1; private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); private DelayOrderQueueManager() { executor = Executors.newFixedThreadPool(thread_num); delayQueue = new DelayQueue<>(); init(); } public static DelayOrderQueueManager getInstance() { return instance; } /** * 初始化 */ public void init() { daemonThread = new Thread(() -> { execute(); }); daemonThread.setName("DelayQueueMonitor"); daemonThread.start(); } private void execute() { while (true) { Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); System.out.println("当前存活线程数量:" + map.size()); int taskNum = delayQueue.size(); System.out.println("当前延时任务数量:" + taskNum); try { // 从延时队列中获取任务 DelayOrderTask<?> delayOrderTask = delayQueue.take(); if (delayOrderTask != null) { Runnable task = delayOrderTask.getTask(); if (null == task) { continue; } // 提交到线程池执行task executor.execute(task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 添加任务 * * @param task * @param time * 延时时间 * @param unit * 时间单位 */ public void put(Runnable task, long time, TimeUnit unit) { // 获取延时时间 long timeout = TimeUnit.NANOSECONDS.convert(time, unit); // 将任务封装成实现Delayed接口的消息体 DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task); // 将消息体放到延时队列中 delayQueue.put(delayOrder); } /** * 删除任务 * * @param task * @return */ public boolean removeTask(DelayOrderTask task) { return delayQueue.remove(task); } }
测试类
package com.delqueue; import java.util.concurrent.TimeUnit; import com.test.delayqueue.DelayOrderQueueManager; import com.test.delayqueue.DelayOrderWorker; public class Test { public static void main(String[] args) { DelayOrderWorker work1 = new DelayOrderWorker();// 任务1 DelayOrderWorker work2 = new DelayOrderWorker();// 任务2 DelayOrderWorker work3 = new DelayOrderWorker();// 任务3 // 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行 DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance(); manager.put(work1, 3000, TimeUnit.MILLISECONDS); manager.put(work2, 6000, TimeUnit.MILLISECONDS); manager.put(work3, 9000, TimeUnit.MILLISECONDS); } }
OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现……
更多关于java算法相关内容感兴趣的读者可查看本站专题:《Java数据结构与算法教程》、《Java操作DOM节点技巧总结》、《Java文件与目录操作技巧汇总》和《Java缓存操作技巧汇总》
希望本文所述对大家java程序设计有所帮助。
相关文章
Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,本篇我们了解如何实现顺序消息的发送与消费2022-06-06springboot使用war包部署到外部tomcat过程解析
这篇文章主要介绍了springboot使用war包部署到外部tomcat过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下2020-01-01Red Hat 安装JDK与IntelliJ IDEA的详细过程
YUM是基于Red Hat的Linux发行版的一个强大而用户友好的包管理工具,这篇文章主要介绍了Red Hat安装JDK与IntelliJ IDEA,需要的朋友可以参考下2023-08-08Struts2中validate数据校验的两种方法详解附Struts2常用校验器
这篇文章主要介绍了Struts2中validate数据校验的两种方法及Struts2常用校验器,本文介绍的非常详细,具有参考借鉴价值,感兴趣的朋友一起看看吧2016-09-09
最新评论