Java延迟队列原理与用法实例详解

 更新时间:2018年09月03日 11:45:35   作者:QH_JAVA  
这篇文章主要介绍了Java延迟队列原理与用法,结合实例形式详细分析了延迟队列的概念、原理、功能及具体使用方法,需要的朋友可以参考下

本文实例讲述了Java延迟队列原理与用法。分享给大家供大家参考,具体如下:

延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……

应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:

简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。

一、消息体

package com.delqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期……
 *
 * @author whd
 * @date 2017年9月24日 下午8:57:14
 */
public class Message implements Delayed {
    private int id;
    private String body; // 消息内容
    private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。
    public int getId() {
        return id;
    }
    public String getBody() {
        return body;
    }
    public long getExcuteTime() {
        return excuteTime;
    }
    public Message(int id, String body, long delayTime) {
        this.id = id;
        this.body = body;
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }
    // 自定义实现比较方法返回 1 0 -1三个参数
    @Override
    public int compareTo(Delayed delayed) {
        Message msg = (Message) delayed;
        return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
                : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
    }
    // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
}

二、消息消费者

package com.delqueue;
import java.util.concurrent.DelayQueue;
public class Consumer implements Runnable {
    // 延时队列 ,消费者从其中获取消息进行消费
    private DelayQueue<Message> queue;
    public Consumer(DelayQueue<Message> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Message take = queue.take();
                System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

三、延时队列

package com.delqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueTest {
     public static void main(String[] args) {
        // 创建延时队列
        DelayQueue<Message> queue = new DelayQueue<Message>();
        // 添加延时消息,m1 延时3s
        Message m1 = new Message(1, "world", 3000);
        // 添加延时消息,m2 延时10s
        Message m2 = new Message(2, "hello", 10000);
        //将延时消息放到延时队列中
        queue.offer(m2);
        queue.offer(m1);
        // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(new Consumer(queue));
        exec.shutdown();
      }
}

将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。

这就是延迟队列demo,下面我们来说说在真实环境下的使用。

使用场景描述:

在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……

下面看看简单的流程图:

下面来看看具体代码实现:

在项目中有如下几个类:第一 、任务类   第二、按照任务类组装的消息体类  第三、延迟队列管理类

任务类即执行筛选司机、绑单、push消息的任务类

package com.test.delayqueue;
/**
 * 具体执行相关业务的业务类
 * @author whd
 * @date 2017年9月25日 上午12:49:32
 */
public class DelayOrderWorker implements Runnable {
    @Override
    public void run() {
        // TODO Auto-generated method stub
        //相关业务逻辑处理
        System.out.println(Thread.currentThread().getName()+" do something ……");
    }
}

消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的

这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体

package com.test.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
 * 延时队列中的消息体将任务封装为消息体
 *
 * @author whd
 * @date 2017年9月25日 上午12:48:30
 * @param <T>
 */
public class DelayOrderTask<T extends Runnable> implements Delayed {
    private final long time;
    private final T task; // 任务类,也就是之前定义的任务类
    /**
     * @param timeout
     *      超时时间(秒)
     * @param task
     *      任务
     */
    public DelayOrderTask(long timeout, T task) {
        this.time = System.nanoTime() + timeout;
        this.task = task;
    }
    @Override
    public int compareTo(Delayed o) {
        // TODO Auto-generated method stub
        DelayOrderTask other = (DelayOrderTask) o;
        long diff = time - other.time;
        if (diff > 0) {
            return 1;
        } else if (diff < 0) {
            return -1;
        } else {
            return 0;
        }
    }
    @Override
    public long getDelay(TimeUnit unit) {
        // TODO Auto-generated method stub
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
    @Override
    public int hashCode() {
        return task.hashCode();
    }
    public T getTask() {
        return task;
    }
}

延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务

package com.test.delayqueue;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 延时队列管理类,用来添加任务、执行任务
 *
 * @author whd
 * @date 2017年9月25日 上午12:44:59
 */
public class DelayOrderQueueManager {
    private final static int DEFAULT_THREAD_NUM = 5;
    private static int thread_num = DEFAULT_THREAD_NUM;
    // 固定大小线程池
    private ExecutorService executor;
    // 守护线程
    private Thread daemonThread;
    // 延时队列
    private DelayQueue<DelayOrderTask<?>> delayQueue;
    private static final AtomicLong atomic = new AtomicLong(0);
    private final long n = 1;
    private static DelayOrderQueueManager instance = new DelayOrderQueueManager();
    private DelayOrderQueueManager() {
        executor = Executors.newFixedThreadPool(thread_num);
        delayQueue = new DelayQueue<>();
        init();
    }
    public static DelayOrderQueueManager getInstance() {
        return instance;
    }
    /**
     * 初始化
     */
    public void init() {
        daemonThread = new Thread(() -> {
            execute();
        });
        daemonThread.setName("DelayQueueMonitor");
        daemonThread.start();
    }
    private void execute() {
        while (true) {
            Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
            System.out.println("当前存活线程数量:" + map.size());
            int taskNum = delayQueue.size();
            System.out.println("当前延时任务数量:" + taskNum);
            try {
                // 从延时队列中获取任务
                DelayOrderTask<?> delayOrderTask = delayQueue.take();
                if (delayOrderTask != null) {
                    Runnable task = delayOrderTask.getTask();
                    if (null == task) {
                        continue;
                    }
                    // 提交到线程池执行task
                    executor.execute(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 添加任务
     *
     * @param task
     * @param time
     *      延时时间
     * @param unit
     *      时间单位
     */
    public void put(Runnable task, long time, TimeUnit unit) {
        // 获取延时时间
        long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
        // 将任务封装成实现Delayed接口的消息体
        DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
        // 将消息体放到延时队列中
        delayQueue.put(delayOrder);
    }
    /**
     * 删除任务
     *
     * @param task
     * @return
     */
    public boolean removeTask(DelayOrderTask task) {
        return delayQueue.remove(task);
    }
}

测试类

package com.delqueue;
import java.util.concurrent.TimeUnit;
import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker;
public class Test {
    public static void main(String[] args) {
        DelayOrderWorker work1 = new DelayOrderWorker();// 任务1
        DelayOrderWorker work2 = new DelayOrderWorker();// 任务2
        DelayOrderWorker work3 = new DelayOrderWorker();// 任务3
        // 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行
        DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
        manager.put(work1, 3000, TimeUnit.MILLISECONDS);
        manager.put(work2, 6000, TimeUnit.MILLISECONDS);
        manager.put(work3, 9000, TimeUnit.MILLISECONDS);
    }
}

OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现……

更多关于java算法相关内容感兴趣的读者可查看本站专题:《Java数据结构与算法教程》、《Java操作DOM节点技巧总结》、《Java文件与目录操作技巧汇总》和《Java缓存操作技巧汇总

希望本文所述对大家java程序设计有所帮助。

相关文章

  • 浅谈java IO流——四大抽象类

    浅谈java IO流——四大抽象类

    这篇文章主要介绍了java IO流——四大抽象类,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,本篇我们了解如何实现顺序消息的发送与消费
    2022-06-06
  • Spring依赖注入(DI)两种方式的示例详解

    Spring依赖注入(DI)两种方式的示例详解

    这篇文章主要介绍了Spring依赖注入(DI)的两种方式:setter注入和构造器注入。文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2022-06-06
  • Jmeter逻辑控制器事务控制器使用方法解析

    Jmeter逻辑控制器事务控制器使用方法解析

    这篇文章主要介绍了Jmeter逻辑控制器事务控制器使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • 轻松掌握java责任链模式

    轻松掌握java责任链模式

    这篇文章主要帮助大家轻松掌握java责任链模式,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-09-09
  • springboot使用war包部署到外部tomcat过程解析

    springboot使用war包部署到外部tomcat过程解析

    这篇文章主要介绍了springboot使用war包部署到外部tomcat过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01
  • Red Hat 安装JDK与IntelliJ IDEA的详细过程

    Red Hat 安装JDK与IntelliJ IDEA的详细过程

    YUM是基于Red Hat的Linux发行版的一个强大而用户友好的包管理工具,这篇文章主要介绍了Red Hat安装JDK与IntelliJ IDEA,需要的朋友可以参考下
    2023-08-08
  • 侦听消息队列的Message Listener类示例详解

    侦听消息队列的Message Listener类示例详解

    Spring AMQP 是基于 Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等,简化了我们对于RabbitMQ相关程序的开发,本文给大家介绍侦听消息队列的Message Listener类,感兴趣的朋友一起看看吧
    2023-12-12
  • Struts2中validate数据校验的两种方法详解附Struts2常用校验器

    Struts2中validate数据校验的两种方法详解附Struts2常用校验器

    这篇文章主要介绍了Struts2中validate数据校验的两种方法及Struts2常用校验器,本文介绍的非常详细,具有参考借鉴价值,感兴趣的朋友一起看看吧
    2016-09-09
  • IDEA 自动跳出括号的快捷键分享

    IDEA 自动跳出括号的快捷键分享

    这篇文章主要介绍了IDEA 自动跳出括号的快捷键分享,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02

最新评论