RocketMQ-延迟消息的处理流程介绍
概述
RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
预设值的延迟时间间隔为:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。
Broker处理延迟消息
CommitLog.putMessage()
//获取消息的sysflag final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); //非事务消息 或 已commit事务消息 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery 判断消息是否设置延迟 if (msg.getDelayTimeLevel() > 0) { //判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } //延迟消息的topic为 SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; //获取延迟级别,一个延迟级别对应一个Queue queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId //消息原始的topic,queueid保存到消息的property中 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }
1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息
2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息
3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX
5、获取延迟级别,一个延迟级别对应一个Queue
6、消息原始的topic,queueid保存到消息的property中
7、修改消息的topci、queueid
启动延迟消息定时任务
ScheduleMessageService.start()
延迟消息投递
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
最新评论