RocketMQ-延迟消息的处理流程介绍

 更新时间:2021年07月03日 09:30:01   作者:pigcoffee  
这篇文章主要介绍了RocketMQ-延迟消息的处理流程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

概述

RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;

在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

CommitLog.putMessage()

//获取消息的sysflag
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //非事务消息 或 已commit事务消息
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 判断消息是否设置延迟
            if (msg.getDelayTimeLevel() > 0) {
                //判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //延迟消息的topic为 SCHEDULE_TOPIC_XXXX
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //获取延迟级别,一个延迟级别对应一个Queue
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 
                // Backup real topic, queueId
                //消息原始的topic,queueid保存到消息的property中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息

2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息

3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级

4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX

5、获取延迟级别,一个延迟级别对应一个Queue

6、消息原始的topic,queueid保存到消息的property中

7、修改消息的topci、queueid

启动延迟消息定时任务

ScheduleMessageService.start()

延迟消息投递

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java中equals方法使用及重写练习

    Java中equals方法使用及重写练习

    equals是在object类中的方法,在object中equals是用来看看两个参数是否引用的是同一个对象,下面这篇文章主要给大家介绍了关于Java中equals方法使用及重写练习的相关资料,需要的朋友可以参考下
    2023-05-05
  • IDEA中设置Tab健为4个空格的方法

    IDEA中设置Tab健为4个空格的方法

    这篇文章给大家介绍了代码缩进用空格还是Tab?(IDEA中设置Tab健为4个空格)的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-03-03
  • java保留小数的四种实现方法

    java保留小数的四种实现方法

    这篇文章主要为大家详细介绍了java保留小数的四种实现方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-11-11
  • 详解Java线程池的增长过程

    详解Java线程池的增长过程

    在本篇文章里小编给大家整理的是关于Java线程池的增长过程以及相关知识点,需要的朋友们可以参考下。
    2019-08-08
  • java实现抖音代码舞源码

    java实现抖音代码舞源码

    这篇文章主要为大家详细介绍了java实现抖音代码舞的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04
  • Java调用echarts提供的地图压缩方法来压缩地图

    Java调用echarts提供的地图压缩方法来压缩地图

    今天小编就为大家分享一篇关于Java调用echarts提供的地图压缩方法来压缩地图,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-12-12
  • 浅析Java中JSONObject和JSONArray使用

    浅析Java中JSONObject和JSONArray使用

    这篇文章主要介绍了Java中JSONObject和JSONArray使用的相关资料,需要的朋友可以参考下
    2016-06-06
  • SpringMVC的Dispatcher解读

    SpringMVC的Dispatcher解读

    这篇文章主要介绍了SpringMVC的Dispatcher用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-11-11
  • JAVA中阻止类的继承(官方和非官方)

    JAVA中阻止类的继承(官方和非官方)

    在面向对象的理论中, 有一些方案要求你用一个办法来声明一个不可继承的类。一般而言,如果类提供的功能不应该被改变,或者更恰当的说,是被覆盖(override)的时候才会出现这种情况。在这篇文章里,我讨论在JAVA语言中的实现办法--官方和非官方的办法
    2014-01-01
  • Spring中@DependsOn注解的使用代码实例

    Spring中@DependsOn注解的使用代码实例

    这篇文章主要介绍了Spring中@DependsOn注解的使用代码实例,Spring中@DependsOn,主要是使用在类和方法上, 作用是当前对象要依赖另外一些对象,被依赖的对象会先注册到Spring的IOC容器中,需要的朋友可以参考下
    2024-01-01

最新评论