RocketMQ的两种消费模式详解
1、添加依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.3</version> </dependency> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
2、消费模式
RocketMQ主要提供了两种消费模式:集群消费以及广播消费。我们只需要在定义消费者的时候通过setMessageModel(MessageModel.XXX)
// 设置消费模型,集群还是广播,默认为集群 CLUSTERING-集群,BROADCASTING-广播 mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); mqPushConsumer.setMessageModel(MessageModel.BROADCASTING);
方法就可以指定是集群还是广播式消费,默认是集群消费模式,即每个Consumer Group中的Consumer均摊所有的消息。
3、集群消费
3.1 生产者
package com.shucha.deveiface.biz.mq.producer; import com.alibaba.fastjson.JSON; import com.shucha.deveiface.biz.model.User; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; /** * @author tqf * @Description 生产者 * @Version 1.0 * @since 2022-07-12 14:50 */ public class MQProducer { public static void main(String[] args) throws MQClientException{ producerSendMessage(); } /** * 生产消息方法 * @throws MQClientException */ public static void producerSendMessage() throws MQClientException { // 创建DefaultMQProducer类并设定生产者名称 DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test"); // 设置NameServer地址,如果是集群的话,使用分号;分隔开 mqProducer.setNamesrvAddr("127.0.0.1:9876"); // 消息最大长度 默认4M mqProducer.setMaxMessageSize(4096); // 发送消息超时时间,默认3000 mqProducer.setSendMsgTimeout(3000); // 发送消息失败重试次数,默认2 mqProducer.setRetryTimesWhenSendAsyncFailed(2); // 启动消息生产者 mqProducer.start(); try { // 循环十次,发送十条消息 for (int i = 1; i <= 10; i++) { User user = new User(); user.setId((long)i); user.setAge(i); user.setUserName("姓名"+i); user.setCreateTime(new Date()); // String msg = "这是第" + i + "条消息测试"; String msg = JSON.toJSONString(user); // 创建消息,并指定Topic(主题),Tag(标签)和消息内容 Message message = new Message("TOPIC_TEST", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送同步消息到一个Broker,可以通过sendResult返回消息是否成功送达 SendResult sendResult = mqProducer.send(message); // mqProducer.sendOneway(message); // 消息id /*System.out.println(sendResult.getMsgId()); // 队列信息 System.out.println(sendResult.getMessageQueue()); // 发送结果 System.out.println(sendResult.getSendStatus()); // 下一个要消费的消息的偏移量 System.out.println(sendResult.getOffsetMsgId()); // 队列消息偏移量 System.out.println(sendResult.getQueueOffset());*/ System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); System.out.println("生产消息异常!"); } // 如果不再发送消息,关闭Producer实例 mqProducer.shutdown(); } }
User用户测试实体类
package com.shucha.deveiface.biz.model; import com.fasterxml.jackson.annotation.JsonFormat; import com.sdy.common.utils.DateUtil; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import java.util.Date; /** * @author tqf * @Description * @Version 1.0 * @since 2022-04-07 13:53 */ @Data public class User { /** * 主键ID */ private Long id; /** *用户名 */ private String userName; /** * 用户密码 */ private String passWord; /** * 年龄 */ private Integer age; /** * 性别(0-男,1-女,2-未知) */ private Integer sex; /** * 创建时间 */ @JsonFormat(pattern = DateUtil.DATETIME_FORMAT) private Date createTime; }
3.2 消费者A
package com.shucha.deveiface.biz.mq.consumer; import com.alibaba.fastjson.JSON; import com.shucha.deveiface.biz.constants.MqConstants; import com.shucha.deveiface.biz.model.User; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.nio.charset.Charset; import java.util.List; /** * @author tqf * @Description 消费者A * @Version 1.0 * @since 2022-07-12 14:37 */ public class ConsumerA { public static void main(String[] args) throws MQClientException { ConsumerA(); } public static void ConsumerA() throws MQClientException { // 创建DefaultMQPushConsumer类并设定消费者名称 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer(MqConstants.ConsumerGroup.CONSUMER_GROUP1); // DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(MqConstants.ConsumerGroup.CONSUMER_GROUP1); // 设置NameServer地址,如果是集群的话,使用分号;分隔开 mqPushConsumer.setNamesrvAddr("127.0.0.1:9876"); // pullConsumer.setNamesrvAddr("127.0.0.1:9876"); // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 // 如果不是第一次启动,那么按照上次消费的位置继续消费 mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 设置消费模型,集群还是广播,默认为集群 CLUSTERING-集群 BROADCASTING-广播 mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); // mqPushConsumer.setMessageModel(MessageModel.BROADCASTING); // 消费者最小线程量 mqPushConsumer.setConsumeThreadMin(5); // 消费者最大线程量 mqPushConsumer.setConsumeThreadMax(10); // 设置一次消费消息的条数,默认是1 mqPushConsumer.setConsumeMessageBatchMaxSize(1); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用* mqPushConsumer.subscribe("TOPIC_TEST", "*"); // 注册回调实现类来处理从broker拉取回来的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接收数据 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgList) { String msgBody = new String(msg.getBody(), Charset.forName(RemotingHelper.DEFAULT_CHARSET)); System.out.println("消费者A接收到消息:" + "===== " + msgBody); /*User user = JSON.parseObject(msgBody, User.class); System.out.println("消费者A接收到消息:" + "===== " + user.getId());*/ } /*MessageExt messageExt = msgList.get(0); String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); System.out.println("消费者A接收到消息: " + messageExt.toString() + "---消息内容为:" + body);*/ // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 mqPushConsumer.start(); System.out.println("ConsumerA Started."); } }
3.3 消费者B
package com.shucha.deveiface.biz.mq.consumer; import com.shucha.deveiface.biz.constants.MqConstants; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; /** * @author tqf * @Description 消费者B * @Version 1.0 * @since 2022-07-12 14:40 */ public class ConsumerB { public static void main(String[] args) throws MQClientException { ConsumerB(); } public static void ConsumerB() throws MQClientException { // 创建DefaultMQPushConsumer类并设定消费者名称 DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer(MqConstants.ConsumerGroup.CONSUMER_GROUP1); // 设置NameServer地址,如果是集群的话,使用分号;分隔开 mqPushConsumer.setNamesrvAddr("127.0.0.1:9876"); // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 // 如果不是第一次启动,那么按照上次消费的位置继续消费 mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 设置消费模型,集群还是广播,默认为集群 CLUSTERING-集群 BROADCASTING-广播 // mqPushConsumer.setMessageModel(MessageModel.CLUSTERING); mqPushConsumer.setMessageModel(MessageModel.BROADCASTING); // 消费者最小线程量 mqPushConsumer.setConsumeThreadMin(5); // 消费者最大线程量 mqPushConsumer.setConsumeThreadMax(10); // 设置一次消费消息的条数,默认是1 mqPushConsumer.setConsumeMessageBatchMaxSize(1); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用* mqPushConsumer.subscribe("TOPIC_TEST", "*"); // 注册回调实现类来处理从broker拉取回来的消息 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接收数据 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { /* MessageExt messageExt = msgList.get(0); String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); System.out.println("消费者B接收到消息: " + messageExt.toString() + "---消息内容为:" + body);*/ for (MessageExt msg : msgList) { System.out.println("消费者B接收到消息:" + "===== " + new String(msg.getBody())); // String msgBody = new String(msg.getBody(), Charset.forName(RemotingHelper.DEFAULT_CHARSET)); } // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 mqPushConsumer.start(); System.out.println("ConsumerB Started."); } }
可以看到, 生产者发送了10条消息,ConsumerA与ConsumerB属于同一个消费者组,集群消费模式下每个消费者摊分消费所有消息。注意,两个消费者的ConsumerGroup组名需要一致,才算是同一个消费者组。
简单总结一下:
1、在Rocket集群消费模式下,(订阅)同一个主题(Topic)下的消息,对于不同的消费者组是一种“广播形式”,即每个消费者组的都会消费消息。
2、在Rocket集群消费模式下,(订阅)同一个主题(Topic)下的消息,对于相同的消费者组的消费者而言是一种集群模式,即同一个消费者组内的所有消费者均分消息并消费。
4、广播消费
一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意 义。
使用方法:setMessageModel(MessageModel.BROADCASTING)
将前面的消费者A和消费者B的集群模式代码设置为如下
mqPushConsumer.setMessageModel(MessageModel.BROADCASTING);
重新启动生成者和2个消费者
4.1 生产者数据
4.2 消费者A
4.3 消费者B
可以看到生产者发送了10条消息,ConsumerA与ConsumerB属于同一个消费者组,广播模式下每个消费者都会全量消费所有消息 。
- 集群消费:任何一条消息只需要被消费者集群中任意一个消费者处理。
- 广播消费:每条消息被推送给消费者集群中的所有注册消费者,保证消息被每个消费者至少消费一次。
到此这篇关于RocketMQ的两种消费模式详解的文章就介绍到这了,更多相关RocketMQ消费模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java实现超大Excel文件解析(XSSF,SXSSF,easyExcel)
这篇文章主要为大家详细介绍了如何利用Java语言实现超大Excel文件解析(XSSF,SXSSF,easyExcel)以及速度的对比,感兴趣的可以了解一下2022-07-07Java DecimalFormat 保留小数位及四舍五入的陷阱介绍
这篇文章主要介绍了Java DecimalFormat 保留小数位及四舍五入的陷阱,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-10-10Spring Cloud Gateway编码实现任意地址跳转的示例
本文主要介绍了Spring Cloud Gateway编码实现任意地址跳转的示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下2021-12-12SpringBoot中使用configtree读取树形文件目录中的配置详解
这篇文章主要介绍了SpringBoot中使用configtree读取树形文件目录中的配置详解,configtree通过spring.config.import + configtree:前缀的方式,加载以文件名为key、文件内容为value的配置属性,需要的朋友可以参考下2023-12-12
最新评论