springboot整合redisson实现延时队列(附仓库地址)

 更新时间:2024年10月16日 09:28:03   作者:ゞ註﹎錠oo  
延时队列用于管理需要定时执行的任务,对于大数据量和高实时性需求,使用延时队列比定时扫库更高效,Redisson提供一种高效的延时队列实现方式,本文就来详细的介绍一下,感兴趣都可以了解学习

应用场景

通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能,比如以下这些场景:

  • 订单超时提醒
  • 收货自动确认
  • 会议提醒
  • 代办事项提醒

为什么使用延时队列

对于数据量小且实时性要求不高的需求来说,最简单的方法就是定时扫描数据库。

但是,当数量达到数百万、上千万级别且时,定时扫库就显得非常低效且消耗资源,

甚至有些时间间隔小实时性要求高的情况,上一次扫描还没结束,下一次就又开始了,

这时候如果使用延时队列就会比较合适

延时队列的几种方式:

  • Quartz 定时任务实现扫库
  • DelayQueue JDK中提供了一组实现延迟队列的API
  • Redis sorted set
  • Redis 过期键监听回调
  • RabbitMQ 死信队列
  • RabbitMQ 基于插件实现延迟队列
  • Wheel 时间轮训算法

Redisson 实现延时队列

顾名思义 Redis son 就是 Redis 的儿子,举个栗子先:

1.引入 pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>${lastest.version}</version>
</dependency>

2.封装一个 RedissonQueue 类

@Service
public class RedissonQueue {

    public static final String QUEUE = "delayQueue";

    // 默认超时时间,30秒
    public static final Integer DEFAULT_TIMEOUT = 30;

    @Resource
    private RedissonClient redissonClient;

    // 加入任务并设置到期时间
    public void offer(String taskId, Integer timeout) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);
    }

    // 移除任务
    public void remove(String taskId) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.removeIf(messageId -> messageId.equals(taskId));
    }

    // 任务列表
    public RDelayedQueue<String> delayedQueue() {
        RBlockingDeque<String> blockingDeque = blockingDeque();
        return redissonClient.getDelayedQueue(blockingDeque);
    }

    public RBlockingDeque<String> blockingDeque() {
        return redissonClient.getBlockingDeque(QUEUE);
    }

    public boolean isShutdown() {
        return redissonClient.isShutdown();
    }

    public void shutdown() {
        redissonClient.shutdown();
    }

}

3.交给 Spring 管理

@Slf4j
@Service
public class RedissonService implements ApplicationRunner {

    @Resource
    private RedissonQueue redissonQueue;

    @Resource(name = "threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor executor;

    @Override
    public void run(ApplicationArguments args) {
        RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();
        executor.execute(() -> {
            while (true) {
                if (redissonQueue.isShutdown()) {
                    return;
                } else {
                    String messageId = null;
                    try {
                        messageId = blockingDeque.take();
                    } catch (InterruptedException e) {
                        log.warn("RedissonConsumer error:{}", e.getMessage());
                    }
                    if (!Objects.isNull(messageId) && !messageId.isEmpty()) {
                        log.warn("timeout messageId : {}", messageId);
                    }
                }
            }
        });

    }

    // 初始化,启动服务就执行一次
    @PostConstruct
    public void init() {
        redissonQueue.delayedQueue();
    }

    @PreDestroy
    public void shutdown() {
        redissonQueue.shutdown();
    }

}

4.测试接口

@Operation(summary = "添加任务", description = "添加任务")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,
                             @RequestParam(value = "timeout", required = false) Integer timeout) {
    taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;
    redissonQueue.offer(taskId, timeout);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

@Operation(summary = "移除任务", description = "移除任务")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {
    redissonQueue.remove(taskId);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

5.测试结果

添加10个任务

在这里插入图片描述

删除第1个任务

在这里插入图片描述

可以看到第一个任务删除后没有被执行(没有设置到期时间,默认为30秒到期)

在这里插入图片描述

实现原理

  • redisson_delay_queue_timeout:delayQueue,sorted set 数据类型,存放所有延迟任务,按延迟任务的到期时间戳(提交任务时间戳 +
    延迟时间)排序,所以列表最前面第一个元素就是整个延迟队列中最早被执行的任务。
  • redisson_delay_queue:delayQueue,list 数据类型,也是存放所有任务。
  • delayQueue,list 数据类型,被称为目标队列,这个里面存放的任务都是已经到延迟时间的,可以被消费者获取的任务,所以上面示例中
    RBlockingQueue 的 take 方法是从此目标队列中获取任务的。
  • redisson_delay_queue_channel:delayQueue,是一个 channel,用来通知客户端开启一个延迟任务
  • 生产者提交任务时将任务放到 redisson_delay_queue_timeout:delayQueue 中,提交任务的时间戳+延迟时间
  • 客户端会有一个延迟任务,这个延迟任务会向 Redis Server 发送一段 lua 脚本,Redis 执行 lua 脚本中的命令,此操作是原子性的

lua 脚本主要干两件事

  • 将到了延迟时间的任务从 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 这个目标队列
  • 获取到 redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务的到期时间戳,发布到 redisson_delay_queue_channel:
    delayQueue channel 中

当客户端监听到 redisson_delay_queue_channel:delayQueue 这个 channel 的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到期时间任务的到期时间戳)当前时间戳
这个时间其实也就是 redisson_delay_queue_channel:delayQueue 中最早到期时间的任务的剩余的延迟时间。
一旦时间来到最早到期时间任务的到期时间戳,redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务已经到期,客户端的延迟任务也同时到期,
于是开始执行 lua 脚本操作,及时将到期任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期任务的到期时间戳到 channel
中,
如此循环运行下去,保证 redisson_delay_queue_timeout:delayQueue 中到期数据能及时放到目标队列中。
这里存在一个特殊情况,需要项目启动时就执行一次延时队列。因为由于没有客户端延迟任务的执行,
可能会出现 redisson_delay_queue_timeout:delayQueue 队列中有到期但是没有被放到目标队列的可能,启动就执行一次是为了保证到期的数据能被及时放到目标队列中。

结论

  • Redisson 方案理论上没有延迟,但当消息数量剧增,消费者消费缓慢这种情况下,可能会导致延迟任务消费的延迟。

  • 消息丢失问题 Redisson 方案最大程度上减轻消息丢失的可能性,因为所有任务都是存在 list 和 sorted set 两种数据类型中,Redis
    有持久化机制。除非整个 redis 集群宕机,可能丢失一小部分数据。

  • 广播任务问题,是不会出现的,因为每个客户端都是从同一个目标队列中获取任务。

Redisson 这种实现方案是比较合适且靠谱的,一般中小型项目建议用 Redisson 实现延迟队列,规模较大的项目直接上 MQ。

整合DEMO仓库地址

到此这篇关于springboot整合redisson实现延时队列(附仓库地址)的文章就介绍到这了,更多相关springboot redisson延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • JAVA_基本LDAP操作实例

    JAVA_基本LDAP操作实例

    这篇文章介绍了JAVA_基本LDAP操作实例,有需要的朋友可以参考一下
    2013-09-09
  • 基于Lombok集成springboot遇到的坑

    基于Lombok集成springboot遇到的坑

    这篇文章主要介绍了Lombok集成springboot遇到的坑,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • 详解Spring Boot自动装配的方法步骤

    详解Spring Boot自动装配的方法步骤

    这篇文章主要介绍了详解Spring Boot自动装配的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • Java实现邮箱发送功能实例(阿里云邮箱推送)

    Java实现邮箱发送功能实例(阿里云邮箱推送)

    这篇文章主要给大家介绍了关于Java实现邮箱发送功能的相关资料,利用阿里云邮箱推送,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • 百度Java面试题 前200页精选(中)

    百度Java面试题 前200页精选(中)

    这篇文章主要为大家分享了Java面试资源中篇,百度“Java面试题”前200页都在这里了,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-12-12
  • 一文掌握SpringBoot多环境配置

    一文掌握SpringBoot多环境配置

    在实际项目的开发过程中,我们程序往往需要在不同环境中运行,每个环境中的配置参数可能都会有所不同,例如数据库连接信息、文件服务器等等,下面小编给大家介绍SpringBoot多环境配置,感兴趣的朋友一起看看吧
    2024-04-04
  • spring boot admin 搭建详解

    spring boot admin 搭建详解

    本篇文章主要介绍了spring boot admin 搭建详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04
  • Spring @value用法示例详解

    Spring @value用法示例详解

    这篇文章主要介绍了Spring-@value用法详解,为了简化读取properties文件中的配置值,spring支持@value注解的方式来获取,这种方式大大简化了项目配置,提高业务中的灵活性,本文通过实例代码给大家介绍的非常详细,需要的朋友参考下吧
    2022-08-08
  • SpringBoot升级指定jackson版本的问题

    SpringBoot升级指定jackson版本的问题

    这篇文章主要介绍了SpringBoot升级指定jackson版本,本文给大家分享了漏洞通告及修改Springboot中jackson版本的问题,需要的朋友可以参考下
    2022-08-08
  • arthas jprofiler做复杂链路的调用分析

    arthas jprofiler做复杂链路的调用分析

    这篇文章主要为大家介绍了arthas jprofiler做复杂链路的调用分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06

最新评论