RocketMQ消息发送与消息类别详解

 更新时间:2023年09月08日 09:46:47   作者:小钟不想敲代码  
这篇文章主要介绍了RocketMQ消息发送与消息类别详解,事务消息的生产者执行本地事务,并根据事务执行的结果选择是否提交或回滚事务,
如果事务执行成功并选择提交事务,则产生注册成功消息,进入下一步,需要的朋友可以参考下

一、消息发送

1.1 单生产者单消费者消息发送(OneToOne)

1、新建maven项目recketmqtest

2、导入RocketMQ客户端坐标

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

3、生产者

package com.liming.base;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
 * @author 黎明
 * @version 1.0
 * 生产者
 * @date 2023/5/21 9:08
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        /**
         1. 谁来发?
         2. 发给谁?
         3. 怎么发?
         4. 发什么?
         5. 发的结果是什么?
         6. 打扫战场
         **/
        // 1、创建一个发送消息的对象Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2、设定发送的命名服务器地址
        producer.setNamesrvAddr("localhost:9876");
        // 3.1 启动发送的服务
        producer.start();
        // 4、创建要发送的消息对象
        Message message = new Message("topic1", "tag1","hello recketmq".getBytes());
        // 3.2 发送消息
        SendResult sendResult = producer.send(message);
        System.out.println("返回结果:" + sendResult);
        // 5、关闭连接
        producer.shutdown();
    }
}

4、消费者

package com.liming.base;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
 * @author 黎明
 * @version 1.0
 * 消费者
 * @date 2023/5/21 9:17
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic1", "*");
        //3.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt msg : list) {
                    System.out.println("收到的消息:" + msg);
                    byte[] body = msg.getBody();
                    System.out.println(new String(body));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接收消息服务已经开启!");
        //5 不要关闭消费者!
    }
}

1.2 单生产者多消费者消息发送(OneToMany)

生产者

//1.创建一个发送消息的对象Producer
 DefaultMQProducer producer = new DefaultMQProducer("group1");
 //2.设定发送的命名服务器地址
 producer.setNamesrvAddr("localhost:9876");
 //3.1启动发送的服务
 producer.start();
 for (int i = 0; i < 10; i++) {
     //4.创建要发送的消息对象,指定topic,指定内容body
     Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes();
     //3.2发送消息
     SendResult result = producer.send(msg);
     System.out.println("返回结果:" + result);
 }
 //5.关闭连接
 producer.shutdown();

消费者(负载均衡模式:默认模式)

 //1.创建一个接收消息的对象Consumer
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 //2.设定接收的命名服务器地址
 consumer.setNamesrvAddr("localhost:9876");
 //3.设置接收消息对应的topic,对应的sub标签为任意
 consumer.subscribe("topic1","*");
 //设置当前消费者的消费模式(默认模式:负载均衡)
 consumer.setMessageModel(MessageModel.CLUSTERING);
 //3.开启监听,用于接收消息
 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         //遍历消息
         for (MessageExt msg : list) {
             System.out.println("收到消息:"+msg);
             System.out.println("消息是:"+new String(msg.getBody()));
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });
 //4.启动接收消息的服务
 consumer.start();
 System.out.println("接受消息服务已经开启!");
 //5 不要关闭消费者!

消费者(广播模式)

//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("localhost:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic1","*");
//设置当前消费者的消费模式(默认模式:负载均衡)
//consumer.setMessageModel(MessageModel.CLUSTERING);
//设置当前消费者的消费模式(广播模式)
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        //遍历消息
        for (MessageExt msg : list) {
            System.out.println("收到消息:"+msg);
            System.out.println("消息是:"+new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//5 不要关闭消费者!

1.3 多生产者多消费者消息发送(ManyToMany)

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

二、消息类别

2.1 同步消息

特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

在这里插入图片描述

代码实现(生产者中):

SendResult result = producer.send(msg);

2.2 异步消息

特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息

在这里插入图片描述

代码实现(生产者中):

//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("localhost:9876");
//3.1启动发送的服务
producer.start();
for (int i = 0; i < 10; i++) {
    //4.创建要发送的消息对象,指定topic,指定内容body
    Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes("UTF-8"));
    //3.2 同步消息
    //SendResult result = producer.send(msg);
    //System.out.println("返回结果:" + result);
    //异步消息
    producer.send(msg, new SendCallback() {
        //表示成功返回结果
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }
        //表示发送消息失败
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable);
        }
    });
    System.out.println("消息"+i+"发完了,做业务逻辑去了!");
}
//休眠10秒
TimeUnit.SECONDS.sleep(10);
//5.关闭连接
producer.shutdown();

2.3 单向消息

特征:不需要有回执的消息,例如日志类消息

在这里插入图片描述

代码实现(生产者中):

producer.sendOneway(msg);

2.4 延时消息

消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);

目前支持的消息时间:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

2.5 批量消息

批量发送消息能显著提高传递小消息的性能.

发送批量消息:

List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);
SendResult result = producer.send(msgList);

注意限制:

  • 这些批量消息应该有相同的topic
  • 相同的waitStoreMsgOK
  • 不能是延时消息
  • 消息内容总长度不超过4M

三、消息过滤

3.1 分类过滤

按照tag过滤信息

生产者:

Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));

消费者:

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");

3.2 语法过滤(属性过滤/语法过滤/SQL过滤)

基本语法:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

生产者:

//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消费者:

//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

重启broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

或者直接cmd中输入(D:\software\rocketmq-all-4.8.0-bin-release\bin)

mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

在这里插入图片描述

四、事务消息

1、正常事务过程(灰色线条)

2、事务补偿过程(蓝色线条)

在这里插入图片描述

事务消息状态

  • 提交状态:允许进入队列,此消息与非事务消息无区别
  • 回滚状态:不允许进入队列,此消息等同于未发送过
  • 中间状态:完成了half消息的发送,未对MQ进行二次状态确认

注意:事务消息仅与生产者有关,与消费者无关

代码实现:

提交状态

//事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
//添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果:"+result);
producer.shutdown();

回滚状态

producer.setTransactionListener(new TransactionListener() {
//正常事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    return LocalTransactionState.ROLLBACK_MESSAGE;
}
//事务补偿
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    return null;
}
});

中间状态

public static void main(String[] args) throws Exception {
	  TransactionMQProducer producer=new TransactionMQProducer("group1");
	  producer.setNamesrvAddr("localhost:9876");
	  producer.setTransactionListener(new TransactionListener() {
	      //正常事务
	      @Override
	      public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
	          return LocalTransactionState.UNKNOW;
	      }
	      //事务补偿 正常执行UNKNOW才会触发
	      @Override
	      public LocalTransactionState checkLocalTransaction(MessageExt msg) {
	          System.out.println("事务补偿");
	          return LocalTransactionState.COMMIT_MESSAGE;
	      }
	  });
	  producer.start();
	  Message msg = new Message("topic13", "hello rocketmq".getBytes("UTF-8"));
	  SendResult result = producer.sendMessageInTransaction(msg, null);
	  System.out.println("返回结果:" + result);
	  //事务补偿生产者一定要一直启动着
	  //producer.shutdown();
}

到此这篇关于RocketMQ消息发送与消息类别详解的文章就介绍到这了,更多相关RocketMQ消息类别内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 查看Spring容器中bean的五种方法小结

    查看Spring容器中bean的五种方法小结

    近期在写Spring项目的时候,需要通过注解的形式去替代之前直接将Bean存放在Spring容器这种方式,以此来简化对于Bean对象的操作,这篇文章主要给大家介绍了关于如何查看Spring容器中bean的五种方法,需要的朋友可以参考下
    2024-05-05
  • idea编译报错-代码没问题IDEA编译不通过的处理方案

    idea编译报错-代码没问题IDEA编译不通过的处理方案

    这篇文章主要介绍了idea编译报错-代码没问题IDEA编译不通过的问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • Java延迟队列原理与用法实例详解

    Java延迟队列原理与用法实例详解

    这篇文章主要介绍了Java延迟队列原理与用法,结合实例形式详细分析了延迟队列的概念、原理、功能及具体使用方法,需要的朋友可以参考下
    2018-09-09
  • 使用Springboot 打jar包实现分离依赖lib和配置

    使用Springboot 打jar包实现分离依赖lib和配置

    这篇文章主要介绍了使用Springboot 打jar包实现分离依赖lib和配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • 简单了解java自定义和自然排序

    简单了解java自定义和自然排序

    这篇文章主要介绍了简单了解java自定义和自然排序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • IDEA中@Autowired自动注入MyBatis Mapper报红警告的几种解决方法

    IDEA中@Autowired自动注入MyBatis Mapper报红警告的几种解决方法

    这篇文章主要介绍了IDEA中@Autowired自动注入MyBatis Mapper报红警告的几种解决方法
    2024-02-02
  • java获取ip地址与网络接口的方法示例

    java获取ip地址与网络接口的方法示例

    这篇文章主要给大家介绍了关于利用java如何获取ip地址与网络接口的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2018-01-01
  • Java反转链表测试过程介绍

    Java反转链表测试过程介绍

    这篇文章主要介绍了Java反转链表测试过程,学习过数据结构的小伙伴们,对链表想来是并不陌生。本篇文章将为大家介绍几种在Java语言当中,实现链表反转的几种方法,以下是具体内容
    2023-04-04
  • Java简单工厂和工厂方法模式详细解析

    Java简单工厂和工厂方法模式详细解析

    这篇文章主要介绍了Java简单工厂和工厂方法模式详细解析,简单工厂模式属于类的创新型模式,又叫静态工厂方法模式是通过专门定义一个类来负责创建其他类的实例,被创建的实例通常都具有共同的父类,需要的朋友可以参考下
    2023-12-12
  • Springboot整合Activiti操作详解

    Springboot整合Activiti操作详解

    这篇文章主要给大家详细介绍了Springboot整合Activiti的操作流程,文中流程步骤和代码示例介绍的非常详细,具有一定的参考价值,需要的朋友可以参考下
    2023-07-07

最新评论