JAVA中通过Redis实现延时任务demo实例

 更新时间:2024年08月24日 11:24:16   作者:MuShanYu  
Redis在2.0版本时引入了发布订阅(pub/sub)功能,在发布订阅中有一个channel(频道),与消息队列中的topic(主题)类似,可以通过redis的发布订阅者模式实现延时任务功能,实例中会议室预约系统,用户预约管理员审核后生效,如未审批,需要自动变超期未处理,使用延时任务

先说结论,有两种方式可以实现:

  1. 通过Redis监听过期key实现。
  2. 使用Redisson 内置的延时队列实现。

1.监听key过期事件实现

1.1 实现原理

Redis在2.0版本时引入了发布订阅(pub/sub)功能,在发布订阅中有一个channel(频道),与消息队列中的topic(主题)类似。可以通过redis的发布订阅者模式实现延时任务功能。

pub/sub即发布者publisher和订阅者subscriber,也可以叫生产者和消费者。发布者通过PUBLISH投递消息给指定的channel,订阅者通过SUBSCRIBER订阅自己关心的channel,订阅者可以订阅一个或者多个不同的channel。

在发布订阅模式下生产者需要将消息发送到指定的channel中,消费者需要订阅对应channel拿到想要的消息。Redis中有很多默认的channel,这些channel是由Redis本身向他们发送消息的,这不是我们自己编写的代码,其中keyevent@:expired 是其中的一个默认channel,db表示的是redis的哪一个数据库。这个channel负责监听过期的key,也就是说如果有一个key过期了,那么redis会将这个key过期的信息发送到这个频道,我们只需要监听这个频道就可以拿到对应的过期key信息,这样我们就能实现一个延迟任务功能了。

举个列子:比如我现在需要实现一个邮件提醒功能,需要在任务发布后的前24小时通过邮件通知未完成的用户。我们可以在任务发布时设置一个key,这个key的过期时间是当前时间到任务前24小时,监听对应的key过期channel,当key过期后拿到对应的key,去执行你自定义的业务逻辑即可,当然这个key需要你进行设计,比如可以为任务id等等。

1.2 实现Demo

现在有一个会议室预约的系统,用户可以通过该系统填写预约理由进行预约,该预约请求需要管理员完成审核后才能生效。有一个需求,如果该预约没有被审批,那么需要自动将该预约申请置为超期未处理。这里我们就可以使用延时任务实现这个功能。

第一步我们需要在房间进行预约操作的时候,同时去缓存一个key,这个key就缓存成房间预约申请的id,这样当key过期时,我们就能拿到对应的申请信息,从而去通知对应的审核人。

房间预约操作时设置对应缓存key:

private void setRoomApplyNotifyCache(RoomReservation roomReservation, String userId) {
        // 记录当前时间->房间预约起始时间,redis缓存,用于判断是否管理员超期未处理,自动更改状态,通知用户房间预约超期未处理,防止占用时间段,用户可以重新预约
        long cacheTimeSecond = DateUtil.between(new Date(), new Date(roomReservation.getStartTime()), DateUnit.SECOND);
        String roomOccupancyApplyKey = "record_reserve_key:" + roomReservation.getId();
        redisCacheUtil.setCacheObject(roomOccupancyApplyKey, userId, cacheTimeSecond, TimeUnit.SECONDS);
        // 前一个小时提醒负责人审核。 预约间隔最少是30分钟
        long cacheNotifyChargerSecond = cacheTimeSecond - (60 * 60);
        // 当前时间距离预约起始时间小于一个小时
        if (cacheTimeSecond <= 3600L && cacheTimeSecond > 1800L) {
            // 不足一个小时,但是大于半个小时
            cacheNotifyChargerSecond = cacheTimeSecond - (30 * 60);
        } else if (cacheTimeSecond < 1800L) {
            // 不设置通知审核人
            return;
        }
        // 缓存
        String notifyChargerKey = RedisCacheKey.ROOM_APPLY_TIMEOUT_NOTIFY_KEY.concatKey(roomReservation.getId());
        redisCacheUtil.setCacheObject(notifyChargerKey, userId, cacheNotifyChargerSecond, TimeUnit.SECONDS);
    }

监听key过期channel并作出处理

@Component
public class RedisExpiredKeyListenerComponent extends KeyExpirationEventMessageListener {
	// 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener
    public RedisExpiredKeyListenerComponent(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, new PatternTopic("__keyevent@0__:expired"));
    }
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        if (expiredKey.startsWith("record_reserve_key:")) {
            String reserveId = expiredKey.substring("record_reserve_key:".length());
            // 根据id查询房间预约信息,发送给审核人通知邮件。
            // ....
        }
    }
}

这样就非常简单的实现了延时任务的功能。

1.3 有什么缺陷?

  1. 时效性差
    为什么这么说?因为过期事件消息是在Redis删除key时才发布的,而不是key过期时就发布了。
    Redis中常用的过期策略有:
  • 惰性删除
    只会在取出key时判断key是否已经过期,这样对cpu比较友好,因为不用频繁的去扫描所有的key。
  • 定期删除
    每隔一段时间抽取一批key执行过期key删除操作。并且,Redis 底层会通过限制删除操作执行的时长和频率来减少删除操作对 CPU 时间的影响。

定期删除对内存更加友好,惰性删除对 CPU 更加友好。两者各有千秋,所以 Redis 采用的是 定期删除+惰性/懒汉式删除

因此,就会存在我设置了 key 的过期时间,但到了指定时间 key 还未被删除,进而没有发布过期事件的情况。

  1. 丢消息
    Redis 的 pub/sub 模式中的消息并不支持持久化,这与消息队列不同。在 Redis 的 pub/sub 模式中,发布者将消息发送给指定的频道,订阅者监听相应的频道以接收消息。当没有订阅者时,消息会被直接丢弃,在 Redis 中不会存储该消息。
  2. 多服务实例的情况下存在消息重复问题
    Redis 的 pub/sub 模式目前只有广播模式,这意味着当生产者向特定频道发布一条消息时,所有订阅相关频道的消费者都能够收到该消息。
    这个时候,我们需要注意多个服务实例重复处理消息的问题,这会增加代码开发量和维护难度。

2. 通过Redission实现

1、引入 Redission 依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.2</version>
</dependency>

2、创建 Redisson 配置类:

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return Redisson.create(config);
    }
}

3、封装了一个延迟队列类 RedissonDelayQueue

@Component
public class RedissonDelayQueue {

    private static final Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);

    @Autowired
    private RedissonClient redissonClient;
    // 延迟队列
    private RDelayedQueue<String> delayQueue;
    // 阻塞队列
    private RBlockingQueue<String> blockingQueue;

    private ExecutorService executorService;

    public RedissonDelayQueue() {
        this.executorService = new ThreadPoolExecutor(
                5,
                10,
                0L, TimeUnit.MILLISECONDS,
                new java.util.concurrent.LinkedBlockingQueue<>(),
                new CustomThreadFactory()
        );
    }

    @PostConstruct
    public void init() {
        blockingQueue = redissonClient.getBlockingQueue("myQueue");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
        startConsumer();
    }

    private void startConsumer() {
        executorService.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // 从阻塞队列中获取任务
                    String task = blockingQueue.take();
                    log.info("Received task: {}", task);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    log.error("Error processing task", e);
                }
            }
        });
    }

    public void addTask(String task, long delay) {
        log.info("Add task: {} with delay: {} seconds", task, delay);
        // 将任务添加到延迟队列
        delayQueue.offer(task, delay, TimeUnit.SECONDS);
    }

    private static class CustomThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "DelayQueue-Consumer");
            thread.setDaemon(true);
            return thread;
        }
    }
}

RedissonDelayQueue 中的两个核心方法:

  • startConsumer():启动一个消费者线程,从阻塞队列 blockingQueue 中获取任务并处理。
  • addTask(String task, long delay):将一个任务添加到延迟队列中,并指定延迟时间。

4、编一个 Controller 测试一下:

@RestController
public class TaskController {

    @Autowired
    private RedissonDelayQueue redissonDelayQueue;

    @PostMapping("/addTask")
    public void addTask(@RequestParam String task, @RequestParam long delay) {
        redissonDelayQueue.addTask(task, delay);
    }
}

GET http://localhost:8080/addTask?task=test-task&delay=15

控制台输出:

可以看到任务的确是延迟了15s后开始执行的。

3. 为什么用Redisson更好?

Redisson 是一个开源的 Java 语言 Redis 客户端,提供了很多开箱即用的功能,比如多种分布式锁的实现、延时队列。

我们可以借助 Redisson 内置的延时队列 RDelayedQueue 来实现延时任务功能。

Redisson 的延迟队列 RDelayedQueue 是基于 Redis 的 SortedSet 来实现的。SortedSet 是一个有序集合,其中的每个元素都可以设置一个分数,代表该元素的权重。Redisson 利用这一特性,将需要延迟执行的任务插入到 SortedSet 中,并给它们设置相应的过期时间作为分数。

Redisson 在客户端(即应用程序进程)中启动一个定时任务,到时间后使用 zrangebyscore 命令扫描 SortedSet 中过期的元素(即分数小于或等于当前时间的元素),然后将这些过期元素从 SortedSet 中移除,并将它们加入到就绪消息列表( List 结构)中。

当任务被移到实际的就绪消息列表中时,Redisson 通常还会通过发布/订阅机制(Redis 的 Pub/Sub 模型)来通知消费者有新任务到达。

就绪消息列表是一个阻塞队列,消费者可以使用阻塞操作(如 BLPOP key 00 表示无限等待,直到有消息进入队列)监听。由于 Redis 的 Pub/Sub 机制是事件驱动的,它避免了轮询开销,只有在有新消息时才会触发处理逻辑。

注意:Redisson 的定时任务调度器并不是以固定的时间间隔频繁调用 zrangebyscore 命令进行扫描,而是根据 SortedSet 中最近的到期时间来动态调整下一次检查的时间点。

当然对于几天或者几周后才会执行的任务,可以结合mysql进行优化。可以通过定时任务(例如 XXL-JOB、Spring Task)定期(如每 15 分钟或 30 分钟)扫描 MySQL 中即将到期的任务(例如在未来 2 小时内到期的任务)并推送到 Redis 中。

4. 为什么不直接用消息队列呢?

在我的项目中(https://github.com/MuShanYu/apply-room-record),由于没有其他场景需要使用消息队列,因此不想为了单一的延时任务场景引入消息队列。引入 MQ 会增加系统的复杂性,需要维护额外的组件和配置,还会增加成本,这是不太可取的。

如果项目将来确实有需要引入 MQ 的场景且 Redis 延时任务确实不再满足项目需求,我会考虑将延时任务的实现平滑迁移到 MQ 上。

个人项目中使用的是简单的key过期监听策略,正在优化。

希望这篇文章能够对你有所帮助。

总结

Redis在2.0版本时引入了发布订阅(pub/sub)功能,在发布订阅中有一个channel(频道),与消息队列中的topic(主题)类似,可以通过redis的发布订阅者模式实现延时任务功能,实例中会议室预约系统,用户预约管理员审核后生效,如未审批,需要自动变超期未处理,使用延时任务。

 

到此这篇关于JAVA中通过Redis实现延时任务demo实例的文章就介绍到这了,更多相关JAVA中Redis实现延时内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Springboot敏感字段脱敏的实现思路

    Springboot敏感字段脱敏的实现思路

    这篇文章主要介绍了Springboot敏感字段脱敏的实现思路,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2021-09-09
  • Java中Arrays类与Math类详解

    Java中Arrays类与Math类详解

    这篇文章主要介绍了Java中Arrays类与Math类,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • Java异步编程的5种异步实现方式详解

    Java异步编程的5种异步实现方式详解

    这篇文章主要介绍了Java异步编程的5种异步实现方式详解,异步编程是程序并发运行的一种手段,它允许多个事件同时发生,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行,需要的朋友可以参考下
    2024-01-01
  • 详解Java编程中Annotation注解对象的使用方法

    详解Java编程中Annotation注解对象的使用方法

    这篇文章主要介绍了Java编程中Annotation注解对象的使用方法,注解以"@注解名"的方式被编写,与类、接口、枚举是在同一个层次,需要的朋友可以参考下
    2016-03-03
  • OpenCV实现普通阈值

    OpenCV实现普通阈值

    这篇文章主要为大家详细介绍了OpenCV实现普通阈值,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-11-11
  • springboot2.0和springcloud Finchley版项目搭建(包含eureka,gateWay,Freign,Hystrix)

    springboot2.0和springcloud Finchley版项目搭建(包含eureka,gateWay,Fre

    这篇文章主要介绍了springboot2.0和springcloud Finchley版项目搭建(包含eureka,gateWay,Freign,Hystrix),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-05-05
  • springboot实现读取nacos配置文件

    springboot实现读取nacos配置文件

    这篇文章主要介绍了springboot实现读取nacos配置文件方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • 详解Java中的JDK、JRE、JVM

    详解Java中的JDK、JRE、JVM

    本文主要介绍了Java中的JDK、JRE、JVM的相关知识。具有很好的参考价值,下面跟着小编一起来看下吧
    2017-01-01
  • JVM原理之完整的一次GC流程解读

    JVM原理之完整的一次GC流程解读

    这篇文章主要介绍了JVM原理之完整的一次GC流程解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • 创建并运行一个java线程方法介绍

    创建并运行一个java线程方法介绍

    这篇文章主要介绍了创建并运行一个java线程,涉及线程代码示例,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11

最新评论