RocketMQ消息发送与消息类别详解
如果事务执行成功并选择提交事务,则产生注册成功消息,进入下一步,需要的朋友可以参考下
一、消息发送
1.1 单生产者单消费者消息发送(OneToOne)
1、新建maven项目recketmqtest
2、导入RocketMQ客户端坐标
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency>
3、生产者
package com.liming.base; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * @author 黎明 * @version 1.0 * 生产者 * @date 2023/5/21 9:08 */ public class Producer { public static void main(String[] args) throws Exception { /** 1. 谁来发? 2. 发给谁? 3. 怎么发? 4. 发什么? 5. 发的结果是什么? 6. 打扫战场 **/ // 1、创建一个发送消息的对象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); // 2、设定发送的命名服务器地址 producer.setNamesrvAddr("localhost:9876"); // 3.1 启动发送的服务 producer.start(); // 4、创建要发送的消息对象 Message message = new Message("topic1", "tag1","hello recketmq".getBytes()); // 3.2 发送消息 SendResult sendResult = producer.send(message); System.out.println("返回结果:" + sendResult); // 5、关闭连接 producer.shutdown(); } }
4、消费者
package com.liming.base; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author 黎明 * @version 1.0 * 消费者 * @date 2023/5/21 9:17 */ public class Consumer { public static void main(String[] args) throws MQClientException { //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("localhost:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意 consumer.subscribe("topic1", "*"); //3.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for (MessageExt msg : list) { System.out.println("收到的消息:" + msg); byte[] body = msg.getBody(); System.out.println(new String(body)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start(); System.out.println("接收消息服务已经开启!"); //5 不要关闭消费者! } }
1.2 单生产者多消费者消息发送(OneToMany)
生产者
//1.创建一个发送消息的对象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("localhost:9876"); //3.1启动发送的服务 producer.start(); for (int i = 0; i < 10; i++) { //4.创建要发送的消息对象,指定topic,指定内容body Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes(); //3.2发送消息 SendResult result = producer.send(msg); System.out.println("返回结果:" + result); } //5.关闭连接 producer.shutdown();
消费者(负载均衡模式:默认模式)
//1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("localhost:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意 consumer.subscribe("topic1","*"); //设置当前消费者的消费模式(默认模式:负载均衡) consumer.setMessageModel(MessageModel.CLUSTERING); //3.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for (MessageExt msg : list) { System.out.println("收到消息:"+msg); System.out.println("消息是:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start(); System.out.println("接受消息服务已经开启!"); //5 不要关闭消费者!
消费者(广播模式)
//1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("localhost:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意 consumer.subscribe("topic1","*"); //设置当前消费者的消费模式(默认模式:负载均衡) //consumer.setMessageModel(MessageModel.CLUSTERING); //设置当前消费者的消费模式(广播模式) consumer.setMessageModel(MessageModel.BROADCASTING); //3.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for (MessageExt msg : list) { System.out.println("收到消息:"+msg); System.out.println("消息是:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start(); System.out.println("接受消息服务已经开启!"); //5 不要关闭消费者!
1.3 多生产者多消费者消息发送(ManyToMany)
多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
二、消息类别
2.1 同步消息
特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
代码实现(生产者中):
SendResult result = producer.send(msg);
2.2 异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
代码实现(生产者中):
//1.创建一个发送消息的对象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("localhost:9876"); //3.1启动发送的服务 producer.start(); for (int i = 0; i < 10; i++) { //4.创建要发送的消息对象,指定topic,指定内容body Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes("UTF-8")); //3.2 同步消息 //SendResult result = producer.send(msg); //System.out.println("返回结果:" + result); //异步消息 producer.send(msg, new SendCallback() { //表示成功返回结果 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示发送消息失败 @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); System.out.println("消息"+i+"发完了,做业务逻辑去了!"); } //休眠10秒 TimeUnit.SECONDS.sleep(10); //5.关闭连接 producer.shutdown();
2.3 单向消息
特征:不需要有回执的消息,例如日志类消息
代码实现(生产者中):
producer.sendOneway(msg);
2.4 延时消息
消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8")); //设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) msg.setDelayTimeLevel(3); SendResult result = producer.send(msg); System.out.println("返回结果:"+result);
目前支持的消息时间:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
2.5 批量消息
批量发送消息能显著提高传递小消息的性能.
发送批量消息:
List<Message> msgList = new ArrayList<Message>(); Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8")); Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8")); Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8")); msgList.add(msg1); msgList.add(msg2); msgList.add(msg3); SendResult result = producer.send(msgList);
注意限制:
- 这些批量消息应该有相同的topic
- 相同的waitStoreMsgOK
- 不能是延时消息
- 消息内容总长度不超过4M
三、消息过滤
3.1 分类过滤
按照tag过滤信息
生产者:
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
消费者:
//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag consumer.subscribe("topic6","tag1 || tag2");
3.2 语法过滤(属性过滤/语法过滤/SQL过滤)
基本语法:
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
生产者:
//为消息添加属性 msg.putUserProperty("vip","1"); msg.putUserProperty("age","20");
消费者:
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法 consumer.subscribe("topic7", MessageSelector.bySql("age >= 18")); consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能
enablePropertyFilter=true
重启broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
或者直接cmd中输入(D:\software\rocketmq-all-4.8.0-bin-release\bin)
mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
四、事务消息
1、正常事务过程(灰色线条)
2、事务补偿过程(蓝色线条)
事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
注意:事务消息仅与生产者有关,与消费者无关
代码实现:
提交状态
//事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.COMMIT_MESSAGE; } //事务补偿过程 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); producer.shutdown();
回滚状态
producer.setTransactionListener(new TransactionListener() { //正常事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.ROLLBACK_MESSAGE; } //事务补偿 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return null; } });
中间状态
public static void main(String[] args) throws Exception { TransactionMQProducer producer=new TransactionMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.setTransactionListener(new TransactionListener() { //正常事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.UNKNOW; } //事务补偿 正常执行UNKNOW才会触发 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("事务补偿"); return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); Message msg = new Message("topic13", "hello rocketmq".getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg, null); System.out.println("返回结果:" + result); //事务补偿生产者一定要一直启动着 //producer.shutdown(); }
到此这篇关于RocketMQ消息发送与消息类别详解的文章就介绍到这了,更多相关RocketMQ消息类别内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
使用Springboot 打jar包实现分离依赖lib和配置
这篇文章主要介绍了使用Springboot 打jar包实现分离依赖lib和配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-02-02IDEA中@Autowired自动注入MyBatis Mapper报红警告的几种解决方法
这篇文章主要介绍了IDEA中@Autowired自动注入MyBatis Mapper报红警告的几种解决方法2024-02-02
最新评论