使用Java代码实现RocketMQ的生产与消费消息

 更新时间:2024年07月31日 08:43:23   作者:小威要向诸佬学习呀  
这篇文章介绍一下其他的小组件以及使用Java代码实现生产者对消息的生成,消费者消费消息等知识点,并通过代码示例介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下

RocketMQ其他组件

在RocketMQ中,除了生产者,消费者,还有一些其他的小组件,接下来逐一介绍一下他们。

监听器(Listener)

定义:监听器是消费者用于处理消息的组件。在PushConsumer(推)模式下,消费者客户端必须设置消费监听器,以便在接收到消息时执行相应的处理逻辑。(比如一会儿下面的代码)

偏移量(Offset)

定义:偏移量是指在消费消息时,记录消费者已经消费到的消息位置的值。每个消息都有一个唯一的偏移量值,它代表了消息在消息队列中的位置。

偏移量具有很大的作用:它能够保证消费者在重启或宕机后能够从上次消费的位置继续消费消息,避免重复消费或漏消费。

简而言之就是它能告诉消费者已经消费到哪一条消息!!

  • 集群模式:在集群消费模式下,消息队列的消费进度保存在Broker端。消费者每次消费完消息后,会将最新的消费进度同步到Broker,以便在消费者重启或者故障转移的时候能够从上一次消费的位置继续消费。
  • 广播模式:在广播消费模式下,消息队列的消费进度保存在消费者本地。因为广播模式下每条消息都会被所有消费者消费,所以不需要在Broker端保存消费进度。

所以,偏移量的的实现方式有两种:包括存储在本地文件(OffsetStore)和存储在Broker中这两种方式。(这样一看,清晰了吧)

命名服务器(NameServer)

定义:命名服务器是RocketMQ中的轻量级路由服务,存储生产者和消费者与Broker之间的路由信息。

它的作用:提供Broker的动态注册与发现服务,生产者和消费者通过NameServer查询Broker的路由信息,从而进行消息的投递和消费。

消息组成

  • Topic:消息主题,对不同的业务消息进行分类。
  • Tag:消息标签,进一步区分某个Topic下的消息分类。使用Tag可以实现对Topic中的消息进行过滤。消费者可以根据Tag来订阅自己感兴趣的消息,而不是接收Topic下的所有消息。
  • Message Body:消息体,消息的实际内容。
  • Keys:消息的键值,标识消息的唯一性。在RocketMQ中,每个消息都可以设置Keys字段,以便在需要的时候根据Keys来查询或者定位消息。
  • 属性:除了上面滴,RocketMQ的消息还可以包含一系列的属性信息,比如消息的发送时间、生产者信息等等。这些属性信息以键值对的形式存在,随着消息一起被存储和传输。

实现生产与消费消息

按之前的步骤搭建完成RocketMQ集群后...

首先我们创建一个空的maven工程,在pom.xml文件中添加RocketMQ的依赖(RocketMQ的依赖版本需要与虚拟机中的保持一致,这里选择和之前一样的4.7.1版本):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

生产消息

然后编写生产者生产消息的代码:

// 1.创建一个DefaultMQProducer实例,指定生产者组名 
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");  
// 2.设置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");    
// 3.启动生产者实例  
producer.start();   
// 4.使用for循环发送10条消息  
for (int i = 0; i < 10; i++) {  
    // 创建一条消息,指定Topic为"MyTopic1",Tag标签为"TagA",消息体为"hello rocketmq"加上循环变量的值,同时把字符串转换为字节数组  
    Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));       
    // 5.发送消息并接收发送结果  
    SendResult sendResult = producer.send(message);  
    // 打印发送结果,包括消息ID、发送状态等信息  
    System.out.println(sendResult);  
}  
// 6.发送完所有消息后,关闭生产者实例,释放资源  
producer.shutdown();

生产者生产消息和消费者消费消息这块的代码都相对较为简单,已经在代码块中加了注释,这里就不再赘述了。

这个时候就可以访问虚拟机+端口号来搜索到发送的消息详情了!

消费消息

// 1.和生产者一样,创建一个DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");  
// 2.设置NameServer的地址,消费者通过这个地址与NameServer进行通信,来获取Broker的地址信息  
consumer.setNamesrvAddr("192.168.220.135:9876");  
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息。我们订阅了"MyTopic1",使用"*"来匹配此Topic下的所有Tag  
consumer.subscribe("MyTopic1", "*");  
// 3.注册消息监听器,用于处理从Broker接收到的消息。使用MessageListenerConcurrently接口的实现,表示并行消费  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
    @Override  
    // 4.当收到消息时,方法会被调用。
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
        // 5.遍历消息列表,并打印每条消息的内容(注意:这里直接打印msg对象不会得到预期的消息内容字符串)  
        for (MessageExt msg : msgs) {  
            // 所以我们打印msg.getBody()的内容,为了保留消息原样  
            System.out.println("已收到消息" + msg);  
        }  
        // 6.返回消费状态,这里表示消息已成功消费  
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    }  
});  
// 7.启动消费者实例  
consumer.start();  
// 8.打印日志消费者已启动  
System.out.println("消费者已启动");

这里需要注意,msg包含了消息的详细信息,包括消息体、标签、属性等等。如果想打印消息内容,应该使用msg.getBody()方法获取消息体的字节数组,并且把它转换为字符串(如果消息体是文本的话)。

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!

以上就是使用Java代码实现RocketMQ的生产与消费消息的详细内容,更多关于Java实现RocketMQ生产与消费的资料请关注脚本之家其它相关文章!

相关文章

  • SpringBoot 读取yml文件的多种方式汇总

    SpringBoot 读取yml文件的多种方式汇总

    这篇文章主要介绍了SpringBoot读取yml文件的几种方式,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-05-05
  • SpringBoot主键ID传到前端后精度丢失的问题解决

    SpringBoot主键ID传到前端后精度丢失的问题解决

    这篇文章主要通过示例为大家详细介绍一些SpringBoot如何解决雪花算法主键ID传到前端后精度丢失问题,文中的示例代码讲解详细,需要的可以参考一下
    2022-05-05
  • Mybatis-Plus开发提速器generator的使用

    Mybatis-Plus开发提速器generator的使用

    本文就介绍这款基于Mybatis-Plus的代码自助生成器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • 基于Intellij Idea乱码的解决方法

    基于Intellij Idea乱码的解决方法

    下面小编就为大家分享一篇基于Intellij Idea乱码的解决方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-03-03
  • 如何查看JVM使用的默认的垃圾收集器

    如何查看JVM使用的默认的垃圾收集器

    这篇文章主要介绍了如何查看JVM使用的默认的垃圾收集器,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • springboot AutoConfigureAfter控制Bean的注入顺序方法详解

    springboot AutoConfigureAfter控制Bean的注入顺序方法详解

    这个文章主要介绍一下@AutoConfigureAfter在spring框架中的作用,在使用过程中,很多开发人员在使用它的时候都出现了问题,问题比较多的就是它们的注册顺序总不是我们预期的,下面介绍一下正常的使用方法,感兴趣的朋友一起看看吧
    2024-05-05
  • 浅析Java Web错误/异常处理页面

    浅析Java Web错误/异常处理页面

    这篇文章主要和大家一起对Java Web错误/异常处理页面进行分析研究,感兴趣的小伙伴们可以参考一下
    2016-03-03
  • java实现jdbc批量插入数据

    java实现jdbc批量插入数据

    这篇文章主要为大家详细介绍了java实现jdbc批量插入数据,三种JDBC批量插入编程方法进行比较,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-05-05
  • Java中ArrayList和SubList的坑面试题

    Java中ArrayList和SubList的坑面试题

    集合是Java开发日常开发中经常会使用到的,下面这篇文章主要给大家介绍了关于Java中ArrayList和SubList的坑面试题,需要的朋友可以参考下
    2022-05-05
  • SpringBoot拦截器实现登录拦截的方法示例

    SpringBoot拦截器实现登录拦截的方法示例

    这篇文章主要介绍了SpringBoot拦截器实现登录拦截的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09

最新评论