RocketMQ普通消息实战演练详解

 更新时间:2022年08月22日 14:59:31   作者:奔跑的毛球  
这篇文章主要为大家介绍了RocketMQ普通消息实战演练详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV。

相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm

普通消息同步发送

同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息。这个方式可以确保消息发送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。

public static void main(String[] args) throws Exception {
    //实例化消息生产者对象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //设置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //启动Producer实例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
        //同步发送方式
        SendResult send = producer.send(msg);
        //确认返回
        System.out.println(send);
    }
    //关闭producer
    producer.shutdown();
}

普通消息异步发送

异步消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

public static void main(String[] args) throws Exception {
    //实例化消息生产者对象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //设置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //启动Producer实例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
        //SendCallback会接收异步返回结果的回调
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
    //若是过早关闭producer,会抛出The producer service state not OK, SHUTDOWN_ALREADY的错
    Thread.sleep(10000);
    //关闭producer
    producer.shutdown();
}

普通消息单向发送

单项发送不关心发送的结果,只发送请求不等待应答。发送消息耗时极短。

public static void main(String[] args) throws Exception {
    //实例化消息生产者对象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //设置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //启动Producer实例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
        //同步发送方式
        producer.sendOneway(msg);
    }
    //关闭producer
    producer.shutdown();
}

集群消费模式

消费者采用负载均衡的方式消费消息,同一个Group下的多个Consumer共同消费Queue里的Message,每个Consumer处理的消息不同。

一个Consumer Group中的各个Consumer实例分共同消费消息,即一条消息只会投递到一个Group下面的一个实例,并且只消费一遍。

例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。

public static void main(String[] args) throws Exception {
    //实例化消息消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //订阅topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册回调实现类来处理从broker拉取回来的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者实例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

广播消费模式

广播消费模式中把消息对一个Group下的各个Consumer实例都投递一遍。也就是说消息也会被 Group 中的每个Consumer都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

public static void main(String[] args) throws Exception {
    //实例化消息消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //订阅topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 注册回调实现类来处理从broker拉取回来的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者实例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

以上就是RocketMQ普通消息实战演练详解的详细内容,更多关于RocketMQ普通消息的资料请关注脚本之家其它相关文章!

相关文章

  • springboot如何重定向外部网页

    springboot如何重定向外部网页

    这篇文章主要介绍了springboot如何重定向外部网页,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • Java 实战项目之CRM客户管理系统的实现流程

    Java 实战项目之CRM客户管理系统的实现流程

    读万卷书不如行万里路,只学书上的理论是远远不够的,只有在实战中才能获得能力的提升,本篇文章手把手带你用java+SSM+jsp+mysql+maven实现一个CRM客户管理系统,大家可以在过程中查缺补漏,提升水平
    2021-11-11
  • Java类初始化顺序详解

    Java类初始化顺序详解

    这篇文章主要介绍了Java类初始化顺序详解,java语言在使用过程中最先开始就是初始化,在工作中如果遇到什么问题需 要定位往往到最后也可能是初始化的问题,因此掌握初始化的顺序很重要,需要的朋友可以参考下
    2023-08-08
  • 详解Java利用实现对称加密(DES、3DES、AES)

    详解Java利用实现对称加密(DES、3DES、AES)

    本篇文章主要介绍了Java利用实现对称加密(DES、3DES、AES),具有一定的参考价值,有兴趣的可以了解一下。
    2017-01-01
  • 解决zuulGateway网关添加路由异常熔断问题

    解决zuulGateway网关添加路由异常熔断问题

    这篇文章主要介绍了解决zuulGateway网关添加路由异常熔断问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-10-10
  • 2020.2 IntelliJ IDEA激活与IDEA2020.2破解详细教程

    2020.2 IntelliJ IDEA激活与IDEA2020.2破解详细教程

    这篇文章主要介绍了2020.2 IntelliJ IDEA激活与IDEA2020.2破解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Java的非对称加密(RSA、数字签名、数字证书)详解

    Java的非对称加密(RSA、数字签名、数字证书)详解

    这篇文章主要介绍了Java的非对称加密(RSA、数字签名、数字证书)详解,非对称加密:加密、解密使用不同的两把密钥,这两把密钥成对,一般通信开始时通过非对称加密将对称加密的密钥发送给另一方,然后双方通过对称加密来进行沟通,需要的朋友可以参考下
    2024-01-01
  • 使用Mybatis Generator结合Ant脚本快速自动生成Model、Mapper等文件的方法

    使用Mybatis Generator结合Ant脚本快速自动生成Model、Mapper等文件的方法

    这篇文章主要介绍了使用Mybatis Generator结合Ant脚本快速自动生成Model、Mapper等文件的方法的相关资料,需要的朋友可以参考下
    2016-06-06
  • Lombok中@EqualsAndHashCode注解的使用及说明

    Lombok中@EqualsAndHashCode注解的使用及说明

    这篇文章主要介绍了Lombok中@EqualsAndHashCode注解的使用及说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • TC 集群Seata1.6高可用架构源码解析

    TC 集群Seata1.6高可用架构源码解析

    这篇文章主要为大家介绍了TC 集群Seata1.6高可用架构源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12

最新评论