SpringBoot3集成RocketMq场景分析

 更新时间:2023年08月17日 09:35:56   作者:知了一笑  
RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景,这篇文章主要介绍了SpringBoot3集成RocketMq,需要的朋友可以参考下

RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

标签:RocketMq5.Dashboard;

一、简介

RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

二、环境部署

1、编译打包

1、下载5.0版本源码包
rocketmq-all-5.0.0-source-release.zip
2、解压后进入目录,编译打包
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

2、修改配置

在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh

distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh

3、服务启动

1、该目录下
distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/
2、启动NameServer
sh mqnamesrv
输出日志
The Name Server boot success. serializeType=JSON
3、启动Broker+Proxy
sh mqbroker -n localhost:9876 --enable-proxy
输出日志
rocketmq-proxy startup successfully
4、关闭服务
sh mqshutdown namesrv
Send shutdown request to mqnamesrv(18636) OK
sh mqshutdown broker
Send shutdown request to mqbroker with proxy enable OK(18647)

4、控制台安装

1、下载master源码包
rocketmq-dashboard-master
2、解压后进入目录,编译打包
mvn clean package -Dmaven.test.skip=true
3、启动服务
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
4、输出日志
INFO main - Tomcat started on port(s): 8080 (http) with context path ''
5、访问服务:localhost:8080

三、工程搭建

1、工程结构

2、依赖管理

rocketmq-starter 组件中,实际上依赖的是 rocketmq-client 组件的 5.0 版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-starter.version}</version>
</dependency>

3、配置文件

配置RocketMq服务地址,消息生产者和消费者;

rocketmq:
  name-server: 127.0.0.1:9876
  # 生产者
  producer:
    group: boot_group_1
    # 消息发送超时时间
    send-message-timeout: 3000
    # 消息最大长度4M
    max-message-size: 4096
    # 消息发送失败重试次数
    retry-times-when-send-failed: 3
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 2
  # 消费者
  consumer:
    group: boot_group_1
    # 每次提取的最大消息数
    pull-batch-size: 5

4、配置类

在配置类中主要定义两个Bean的加载,即 RocketMQTemplate DefaultMQProducer ,主要是提供消息发送的能力,即生产消息;

@Configuration
public class RocketMqConfig {
    @Value("${rocketmq.name-server}")
    private String nameServer;
    @Value("${rocketmq.producer.group}")
    private String producerGroup;
    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer sendMsgTimeout;
    @Value("${rocketmq.producer.max-message-size}")
    private Integer maxMessageSize;
    @Value("${rocketmq.producer.retry-times-when-send-failed}")
    private Integer retryTimesWhenSendFailed ;
    @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
    private Integer retryTimesWhenSendAsyncFailed ;
    @Bean
    public RocketMQTemplate rocketMqTemplate(){
        RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
        rocketMqTemplate.setProducer(defaultMqProducer());
        return rocketMqTemplate;
    }
    @Bean
    public DefaultMQProducer defaultMqProducer() {
        DefaultMQProducer producer = new DefaultMQProducer();
        producer.setNamesrvAddr(this.nameServer);
        producer.setProducerGroup(this.producerGroup);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        producer.setMaxMessageSize(this.maxMessageSize);
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
        return producer;
    }
}

四、基础用法

1、消息生产

编写一个生产者接口类,分别使用 RocketMQTemplate DefaultMQProducer 实现消息发送的功能,然后可以通过 Dashboard 控制面板查看消息详情;

@RestController
public class ProducerWeb {
    private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);
    @Autowired
    private RocketMQTemplate rocketMqTemplate;
    @GetMapping("/send/msg1")
    public String sendMsg1 (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));
            // 发送消息
            rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
    @Autowired
    private DefaultMQProducer defaultMqProducer ;
    @GetMapping("/send/msg2")
    public String sendMsg2 (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));
            // 构建消息对象
            Message message = new Message();
            message.setTopic("boot-mq-topic");
            message.setTags("boot-mq-tag");
            message.setKeys("boot-mq-key");
            message.setBody(msgBody.getBytes());
            // 发送消息,打印日志
            SendResult sendResult = defaultMqProducer.send(message);
            log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
}

2、消息消费

编写消息监听类,实现 RocketMQListener 接口,通过 RocketMQMessageListener 注解控制监听的具体信息;

@Service
@RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")
public class ConsumerListener implements RocketMQListener<String> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
    @Override
    public void onMessage(String message) {
        log.info("\n=====\n message:{} \n=====\n",message);
    }
}

五、参考源码

文档仓库: https://gitee.com/cicadasmile/butte-java-note

源码仓库: https://gitee.com/cicadasmile/butte-spring-parent

Gitee主页: https://gitee.com/cicadasmile/butte-java-note

到此这篇关于SpringBoot3集成RocketMq的文章就介绍到这了,更多相关SpringBoot3集成RocketMq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 解决SpringBoot整合MybatisPlus分模块管理遇到的bug

    解决SpringBoot整合MybatisPlus分模块管理遇到的bug

    这篇文章主要介绍了解决SpringBoot整合MybatisPlus分模块管理遇到的bug,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • JavaCV实现人脸检测功能

    JavaCV实现人脸检测功能

    这篇文章主要为大家详细介绍了JavaCV实现人脸检测功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-02-02
  • Java在枚举类型中增加自定义方法详解

    Java在枚举类型中增加自定义方法详解

    这篇文章主要介绍了Java在枚举类型中增加自定义方法详解,对于枚举类型来说,除了无法继承它以外,基本可以将它看作一个普通的类,这意味着你可以在里面增加自定义的方法,甚至可以增加一个 main() 方法,需要的朋友可以参考下
    2023-11-11
  • 详解Java中native方法的使用

    详解Java中native方法的使用

    native是与C++联合开发的时候用的!使用native关键字说明这个方法是原生函数,也就是这个方法是用C/C++语言实现的,并且被编译成了DLL,由java去调用。本文给大家介绍java 中native方法使用,感兴趣的朋友一起看看吧
    2020-09-09
  • Java判断ip是否为IPV4或IPV6地址的多种方式

    Java判断ip是否为IPV4或IPV6地址的多种方式

    本文主要介绍了Java判断ip是否为IPV4或IPV6地址的多种方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-03-03
  • 使用springBoot中的info等级通过druid打印sql

    使用springBoot中的info等级通过druid打印sql

    这篇文章主要介绍了使用springBoot中的info等级通过druid打印sql,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java通过正则表达式捕获组中的文本

    Java通过正则表达式捕获组中的文本

    这篇文章主要给大家介绍了关于利用Java如何通过正则表达式捕获组中文本的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Java具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧下
    2019-09-09
  • 解析MyBatisPlus解决逻辑删除与唯一索引的兼容问题

    解析MyBatisPlus解决逻辑删除与唯一索引的兼容问题

    这篇文章主要介绍了MyBatisPlus解决逻辑删除与唯一索引的兼容问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-04-04
  • 带有@Transactional和@Async的循环依赖问题的解决

    带有@Transactional和@Async的循环依赖问题的解决

    这篇文章主要介绍了带有@Transactional和@Async的循环依赖问题的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-04-04
  • Java PriorityQueue数据结构接口原理及用法

    Java PriorityQueue数据结构接口原理及用法

    这篇文章主要介绍了Java PriorityQueue数据结构接口原理及用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-10-10

最新评论