redis使用zset实现延时队列的示例代码

 更新时间:2023年06月02日 15:31:50   作者:抢老婆酸奶的小肥仔  
本文主要介绍了redis使用zset实现延时队列的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

最近在使用redis时,就想能不能用其实现消息队列?也在网上看了下其他小伙伴写的实现,结合自身业务实现了如下消息队列,希望对大家有用。

废话不多说,直接开撸。

1、为什么zset可以做消息队列?

首先我们来看下,设计消息队列需要考虑的需求:有序性,消息重复性,可靠性。

  • 有序性:zset所有元素可以根据成员关联的score来进行从低到高的排序,例如,我们可以利用时间戳来进行排序
  • 消息重复性:在zset中每个元素都是唯一的,这也保证了消息的唯一性
  • 可靠性:zset会自动维护元素之间的顺序,在添加或删除元素时无需手动排序,提升操作速度。

2、使用的zset命令

命令描述
zadd将一个给定score的成员添加到有序集合中,返回添加元素的个数
zrange根据元素在有序排序中的位置,从有序集合中获取多个元素
rank(K key, Object o)获取指定元素在集合中的索引,索引从0开始

3、代码实现

使用zset实现消息队列时,具体的流程,如下:

生产者流程:

  • 用户获取消息Id,并封装消息体
  • 用户发送数据到生产者,先获取锁
  • 如果获取到锁,则校验该消息体是否已添加到队列中,已添加则直接返回提醒。
  • 若未添加则调用方法将数据保存到zset集合中,否则等到指定时间后再获取锁。
  • 推送数据后,释放锁

消费者流程:

  • 调用方法获取数据
  • 获取到数据,则直接返回,否则到指定时间后再次获取数据,直到获取到数据并返回。

统一返回类:

    /**
     * @Author: jiangjs
     * @Description:
     * @Date: 2021/11/12 15:46
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public class ResultUtil<T> implements Serializable {
        private int code;
        private String msg;
        private T data;
        public static <T> ResultUtil<T> success(){
            return ResultUtil.<T>builder().code(1000).msg("成功").build();
        }
        public static <T> ResultUtil<T> success(T data){
            return ResultUtil.<T>builder().code(1000).msg("成功").data(data).build();
        }
        public static <T> ResultUtil<T> error(String msg){
            return ResultUtil.<T>builder().code(5000).msg(msg).data(null).build();
        }
        public static <T> ResultUtil<T> error(int code,String msg){
            return ResultUtil.<T>builder().code(code).msg(msg).build();
        }
    }

3.1 消息实体

需添加消息Id,主要防止消息重复提交。

    /**
     * @author: jiangjs
     * @description: 消息实体
     * @date: 2023/5/30 11:11
     **/
    @Data
    @Accessors(chain = true)
    public class QueueTask<T> {
        /**
         * 消息Id
         */
        private String taskId;
        /**
         * 任务
         */
        private T task;
    }

3.2 队列类型

队列类型可以理解为队列的名称,通过枚举,可以随意添加队列名称。

    /**
     * @author: jiangjs
     * @description: 队列类型
     * @date: 2023/5/30 10:53
     **/
    public enum QueueTypeEnum {
        /**
         * 订单
         */
        ORDER("order");
        private final String type;
        QueueTypeEnum(String type){
            this.type = type;
        }
        public String getType(){
            return type;
        }
    }

3.3 创建消息工具

    package com.jiashn.springbootproject.redis.utils;
    import com.jiashn.springbootproject.redis.domain.QueueTask;
    import com.jiashn.springbootproject.utils.ResultUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import javax.annotation.Resource;
    import java.time.LocalDateTime;
    import java.util.Objects;
    import java.util.Set;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    /**
     * @author: jiangjs
     * @description: redis实现消息队列
     * @date: 2023/5/30 10:51
     **/
    public class RedisQueueUtil<T> {
        private static final Logger log = LoggerFactory.getLogger(RedisQueueUtil.class);
        private RedisTemplate<String,QueueTask<T>> redisTemplate;
        /**
         * 队列类型,即名称
         */
        private final QueueTypeEnum typeEnum;
        public RedisQueueUtil(QueueTypeEnum typeEnum,RedisTemplate<String,QueueTask<T>> redisTemplate){
            this.typeEnum = typeEnum;
            this.redisTemplate = redisTemplate;
        }
        /**
         * 添加消息数据
         * @param queueTask 消息
         * @param time 延迟时间,单位s
         */
        public ResultUtil<String> sendQueueTask(QueueTask<T> queueTask, long time){
            //加锁
            if (getLock()){
                try {
                    Long rank = redisTemplate.opsForZSet().rank(typeEnum.getType(), queueTask);
                    if (Objects.nonNull(rank)){
                        return ResultUtil.error(6000,"消息数据已经存在,不予添加......");
                    }
                    Boolean result = redisTemplate.opsForZSet().add(typeEnum.getType(), queueTask, System.currentTimeMillis() + time*1000);
                    if (Objects.nonNull(result) && result){
                        log.info("添加消息数据成功:" + queueTask + ",添加时间:" + LocalDateTime.now());
                        return ResultUtil.success("添加消息数据成功");
                    }
                    return ResultUtil.error("添加消息数据失败");
                }finally {
                    //释放锁
                    releaseLock();
                }
            } else {
                log.info("未获取到锁,稍后再试");
                return ResultUtil.error("未获取到锁,稍后再试");
            }
        }
        /**
         * 获取zset前count数据
         * @param count 数据数
         * @return 返回获取到数据
         */
        public Set<QueueTask<T>> loopGetTask(int count) {
                //rangeByScore,根据score顺序获取zset数据的值
                return redisTemplate.opsForZSet().rangeByScore(typeEnum.getType(), 0, System.currentTimeMillis(), 0, count-1);
        }
        /**
         * 注销消息队列
         * @param typeEnum 消息队列名称
         */
        public void destroy(QueueTypeEnum typeEnum){
            redisTemplate.opsForZSet().remove(typeEnum.getType());
        }
        /**
         * 获取任务Id
         * @return 返回消息Id
         */
        public String getTaskId(){
           return typeEnum.getType() + "_" + UUID.randomUUID().toString().replace("-","");
        }
        /**
         * 获取锁
         * @return 返回加锁状态
         */
        private boolean getLock(){
            Boolean absent = redisTemplate.opsForValue().setIfAbsent(typeEnum.getType() + "_Locked", null, 30L, TimeUnit.MINUTES);
            return Objects.nonNull(absent) ? absent : false;
        }
        /**
         * 释放锁
         */
        public void releaseLock(){
            redisTemplate.delete(typeEnum.getType() + "_Locked");
        }
    }

在消息工具类中,创建消息任务时添加了锁,只有在获取锁的前提下才能添加消息任务。

提供获取消息Id的方法是为了让提交消息任务前,先获取Id,即使在提交时网络发生问题,提交的Id还是同一个,再进行消息消费时,可以根据这个Id来进行判断该消息任务是否已被消费,被消费则直接丢弃。

3.4 消费消息

    /**
     * @author: jiangjs
     * @description: 启动消费
     * @date: 2023/5/30 14:27
     **/
    @Component
    public class CustomerTaskLineRunner implements CommandLineRunner {
        @Resource
        private RedisTemplate<String,QueueTask<String>> redisTemplate;
        private final static String QUEUE_TYPE = QueueTypeEnum.ORDER.getType();
        private final static Logger log = LoggerFactory.getLogger(CustomerTaskLineRunner.class);
        @Override
        public void run(String... args) throws Exception {
            RedisQueueUtil<String> queueUtil = new RedisQueueUtil<>(QueueTypeEnum.ORDER,redisTemplate);
            while (true){
                Set<QueueTask<String>> queueTasks = queueUtil.loopGetTask(10);
                if (CollectionUtils.isNotEmpty(queueTasks)){
                    for (QueueTask<String> queueTask : queueTasks) {
                        //校验当前消息是否已消费,主要防止网络延时,导致多次提交同一任务 存在
                        QueueTask<String> stringQueueTask = redisTemplate.opsForValue().get(QUEUE_TYPE + "_" + queueTask.getTaskId());
                        if (Objects.nonNull(stringQueueTask)){
                            log.info("该任务已经消费,不能重复消费");
                            redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
                            continue;
                        }
                        Long removeNum = redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
                        if (Objects.nonNull(removeNum) && removeNum > 0){
                            String task = queueTask.getTask();
                            log.info("消费任务数据:" + task);
                            //设置过期时间,10分钟内则默认是重复提交
                            redisTemplate.opsForValue().set(QUEUE_TYPE + "_" + queueTask.getTaskId(),queueTask,10L, TimeUnit.MINUTES);
                        }
                    }
                }
                log.info("------1分钟后再次获取------");
                Thread.sleep(60000);
            }
        }
    }

校验重复消息,若消息重复且在10分钟内未被消费,则直接将该消息从队列中删除。在消息任务被消费后,将数据从队列中移除。

执行结果:

到此这篇关于redis使用zset实现延时队列的示例代码的文章就介绍到这了,更多相关redis zset延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Redis在项目中常见的12种使用场景示例和说明

    Redis在项目中常见的12种使用场景示例和说明

    Redis是一个开源的高性能键值对数据库,它以其内存中数据存储、键过期策略、持久化、事务、丰富的数据类型支持以及原子操作等特性,在许多项目中扮演着关键角色,以下是整理的12个Redis在项目中常见的使用场景举例说明和解释
    2024-06-06
  • Redis中的zset类型详解

    Redis中的zset类型详解

    有序集合zset保留了set集合不能有重复成员的特点,但与set集合不同的是,zset的每个member都有一个唯一的浮点数类型的分数score与之关联,这篇文章主要介绍了Redis的zset类型,需要的朋友可以参考下
    2023-08-08
  • easyswoole3.5 redis使用详细解析

    easyswoole3.5 redis使用详细解析

    这篇文章主要介绍了easyswoole3.5 redis使用的相关知识,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-05-05
  • redis的string类型及bitmap介绍

    redis的string类型及bitmap介绍

    这篇文章主要介绍了redis的string类型及bitmap介绍,redis有很多的客户端连接进来,站在redis所在机器的角度来说,就是有很多socket的连接
    2022-07-07
  • Redis Lua脚本的使用教程

    Redis Lua脚本的使用教程

    在Redis的学习中,Lua脚本是一项强大的高级特性,它允许用户在Redis中执行复杂的操作,本文就来介绍一下Redis Lua,脚本的使用教程,感兴趣的可以了解一下
    2024-03-03
  • 详解Redis如何保证接口的幂等性

    详解Redis如何保证接口的幂等性

    如何防止接口中同样的数据提交,以及如何保证消息不被重复消费,这些都是shigen在学习的过程中遇到的问题,今天,趁着在学习redis的间隙,我写了一篇文章进行简单的实现,需要的朋友可以参考下
    2023-11-11
  • Redis实现用户签到的示例代码

    Redis实现用户签到的示例代码

    Redis的位图可以高效实现用户签到功能,每个bit位对应一个签到状态,节省存储空间,利用SETBIT、GETBIT等命令操作签到数据,可统计连续签到天数和本月签到情况,感兴趣的可以了解一下
    2024-09-09
  • Redis集合类型的常用命令小结

    Redis集合类型的常用命令小结

    这篇文章给大家整理了在操作Redis集合类型中的常用命令,文章总结的很全面,对大家学习Redis具有一定的参考借鉴价值,下面来一起看看吧。
    2016-09-09
  • Java实现多级缓存的方法详解

    Java实现多级缓存的方法详解

    对于高并发系统来说,有三个重要的机制来保障其高效运行,它们分别是:缓存、限流和熔断,所以本文就来和大家探讨一下多级缓存的实现方法,希望对大家有所帮助
    2024-02-02
  • Redis解决缓存一致性问题

    Redis解决缓存一致性问题

    本文主要介绍了Redis 解决缓存一致性问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-10-10

最新评论