Java中RocketMQ的延迟消息详解
RocketMQ简介
RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠、万亿级容量、灵活可伸缩的消息发布与订阅服务。
它前身是MetaQ,是阿里基于Kafka的设计使用Java进行自主研发的。在2012年,阿里将其开源, 在2016年,阿里将其捐献给Apache软件基金会(Apache Software Foundation,简称为ASF),正式成为孵化项目。2017 年,Apache软件基金会宣布RocketMQ已孵化成为 Apache顶级项目(Top Level Project,简称为TLP ),是国内首个互联网中间件在 Apache上的顶级项目。
延迟消息
生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。
在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。
消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。在发送消息时,设置消息延迟级别即可,设置消息延迟级别时有以下3种情况:
- 设置消息延迟级别等于0时,则该消息为非延迟消息。
- 设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
- 设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。
延迟消息示例
首先,写一个消费者,用于消费延迟消息:
public class Consumer { public static void main(String[] args) throws MQClientException { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup"); // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("OneMoreTopic", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.printf("%s %s Receive New Messages:%n" , sdf.format(new Date()) , Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.printf("\tMsg Id: %s%n", msg.getMsgId()); System.out.printf("\tBody: %s%n", new String(msg.getBody())); } // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者实例 consumer.start(); System.out.println("Consumer Started."); } }
再写一个延迟消息的生产者,用于发送延迟消息:
public class DelayProducer { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); Message msg = new Message("OneMoreTopic" , "DelayMessage", "This is a delay message.".getBytes()); //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" //设置消息延迟级别为3,也就是延迟10s。 msg.setDelayTimeLevel(3); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s Send Status: %s, Msg Id: %s %n" , sdf.format(new Date()) , sendResult.getSendStatus() , sendResult.getMsgId()); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
运行生产者以后,就会发送一条延迟消息:
10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000
10秒钟后,消费者收到的这条延迟消息:
10:37:25.026 ConsumeMessageThread_1 Receive New Messages: Msg Id: C0A8006D5AB018B4AAC216E0DB690000 Body: This is a delay message.
延迟消息的原理分析
以下分析的RocketMQ源码的版本号是4.7.1,版本不同源码略有差别。
CommitLog
在org.apache.rocketmq.store.CommitLog中,针对延迟消息做了一些处理:
// 延迟级别大于0,就是延时消息 if (msg.getDelayTimeLevel() > 0) { // 判断当前延迟级别,如果大于最大延迟级别, // 就设置当前延迟级别为最大延迟级别。 if (msg.getDelayTimeLevel() > this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()); } // 获取延迟消息的主题, // 其中RMQ_SYS_SCHEDULE_TOPIC的值为SCHEDULE_TOPIC_XXXX topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 根据延迟级别获取延迟消息的队列Id, // 队列Id其实就是延迟级别减1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 备份真正的主题和队列Id MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 设置延时消息的主题和队列Id msg.setTopic(topic); msg.setQueueId(queueId); }
可以看到,每一个延迟消息的主题都被暂时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列Id。接下来,处理延迟消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。
ScheduleMessageService
ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore进行初始化的,初始化包括构造对象和调用 load
方法。最后,再执行ScheduleMessageService的 start
方法:
public void start() { // 使用AtomicBoolean确保start方法仅有效执行一次 if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 遍历所有延迟级别 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // key为延迟级别 Integer level = entry.getKey(); // value为延迟级别对应的毫秒数 Long timeDelay = entry.getValue(); // 根据延迟级别获得对应队列的偏移量 Long offset = this.offsetTable.get(level); // 如果偏移量为null,则设置为0 if (null == offset) { offset = 0L; } if (timeDelay != null) { // 为每个延迟级别创建定时任务, // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒 this.timer.schedule( new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 延迟10秒后每隔flushDelayOffsetInterval执行一次任务, // 其中,flushDelayOffsetInterval默认配置也为10秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 持久化每个队列消费的偏移量 if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore .getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为1秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。
然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由flushDelayOffsetInterval属性进行配置,默认为10秒。
定时任务
ScheduleMessageService的 start
方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在DeliverDelayedMessageTimerTask类之中,它核心代码是executeOnTimeup方法之中,我们来看一下主要部分:
// 根据主题和队列Id获取消息队列 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue( TopicValidator.RMQ_SYS_SCHEDULE_TOPIC , delayLevel2QueueId(delayLevel));
如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
// 根据消费偏移量从消息队列中获取所有有效消息 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
如果没有获取到有效消息,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:
// 遍历所有消息 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 获取消息的物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 获取消息的物理长度 int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略部分代码... long now = System.currentTimeMillis(); // 计算消息应该被消费的时间 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 计算下一条消息的偏移量 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE) long countdown = deliverTimestamp - now; // 省略部分代码... }
如果当前消息不到消费的时间,则在 countdown
毫秒后再执行任务。如果到消费的时间,就继续执行下面操作:
// 根据消息的物理偏移量和大小获取消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy);
如果获取到消息,则继续执行下面操作:
// 重新构建新的消息,包括: // 1.清除消息的延迟级别 // 2.恢复真正的消息主题和队列Id MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}," + " discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } // 重新把消息发送到真正的消息队列上 PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner);
清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。
总结
经过以上对源码的分析,可以总结出延迟消息的实现步骤:
如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。消息进入SCHEDULE_TOPIC_XXXX的队列中。定时任务根据上次拉取的偏移量不断从队列中取出所有消息。根据消息的物理偏移量和大小再次获取消息。根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。重新发送消息到原主题的队列中,供消费者进行消费。
到此这篇关于Java中RocketMQ的延迟消息详解的文章就介绍到这了,更多相关RocketMQ的延迟消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
springcloud集成nacos 使用lb 无效问题解决方案
这篇文章主要介绍了解决springcloud集成nacos 使用lb 无效,通过查看spring-cloud-starter-gateway jar中的自动配置类的源码,得知,该jar包中是不支持负载均衡的,需要引入spring-cloud-starter-loadbalancer 来支持,需要的朋友可以参考下2023-04-04SpringBoot项目的logback日志配置(包括打印mybatis的sql语句)
这篇文章主要介绍了SpringBoot项目的logback日志配置(包括打印mybatis的sql语句),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-09-09
最新评论