SpringBoot3集成RocketMq场景分析
RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;
标签:RocketMq5.Dashboard;
一、简介
RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;
二、环境部署
1、编译打包
1、下载5.0版本源码包 rocketmq-all-5.0.0-source-release.zip 2、解压后进入目录,编译打包 mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
2、修改配置
在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh
distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh
3、服务启动
1、该目录下 distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/ 2、启动NameServer sh mqnamesrv 输出日志 The Name Server boot success. serializeType=JSON 3、启动Broker+Proxy sh mqbroker -n localhost:9876 --enable-proxy 输出日志 rocketmq-proxy startup successfully 4、关闭服务 sh mqshutdown namesrv Send shutdown request to mqnamesrv(18636) OK sh mqshutdown broker Send shutdown request to mqbroker with proxy enable OK(18647)
4、控制台安装
1、下载master源码包 rocketmq-dashboard-master 2、解压后进入目录,编译打包 mvn clean package -Dmaven.test.skip=true 3、启动服务 java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar 4、输出日志 INFO main - Tomcat started on port(s): 8080 (http) with context path '' 5、访问服务:localhost:8080
三、工程搭建
1、工程结构
2、依赖管理
在 rocketmq-starter
组件中,实际上依赖的是 rocketmq-client
组件的 5.0
版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-starter.version}</version> </dependency>
3、配置文件
配置RocketMq服务地址,消息生产者和消费者;
rocketmq: name-server: 127.0.0.1:9876 # 生产者 producer: group: boot_group_1 # 消息发送超时时间 send-message-timeout: 3000 # 消息最大长度4M max-message-size: 4096 # 消息发送失败重试次数 retry-times-when-send-failed: 3 # 异步消息发送失败重试次数 retry-times-when-send-async-failed: 2 # 消费者 consumer: group: boot_group_1 # 每次提取的最大消息数 pull-batch-size: 5
4、配置类
在配置类中主要定义两个Bean的加载,即 RocketMQTemplate
和 DefaultMQProducer
,主要是提供消息发送的能力,即生产消息;
@Configuration public class RocketMqConfig { @Value("${rocketmq.name-server}") private String nameServer; @Value("${rocketmq.producer.group}") private String producerGroup; @Value("${rocketmq.producer.send-message-timeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.max-message-size}") private Integer maxMessageSize; @Value("${rocketmq.producer.retry-times-when-send-failed}") private Integer retryTimesWhenSendFailed ; @Value("${rocketmq.producer.retry-times-when-send-async-failed}") private Integer retryTimesWhenSendAsyncFailed ; @Bean public RocketMQTemplate rocketMqTemplate(){ RocketMQTemplate rocketMqTemplate = new RocketMQTemplate(); rocketMqTemplate.setProducer(defaultMqProducer()); return rocketMqTemplate; } @Bean public DefaultMQProducer defaultMqProducer() { DefaultMQProducer producer = new DefaultMQProducer(); producer.setNamesrvAddr(this.nameServer); producer.setProducerGroup(this.producerGroup); producer.setSendMsgTimeout(this.sendMsgTimeout); producer.setMaxMessageSize(this.maxMessageSize); producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed); return producer; } }
四、基础用法
1、消息生产
编写一个生产者接口类,分别使用 RocketMQTemplate
和 DefaultMQProducer
实现消息发送的功能,然后可以通过 Dashboard
控制面板查看消息详情;
@RestController public class ProducerWeb { private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class); @Autowired private RocketMQTemplate rocketMqTemplate; @GetMapping("/send/msg1") public String sendMsg1 (){ try { // 构建消息主体 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg")); // 发送消息 rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody); } catch (Exception e) { e.printStackTrace(); } return "OK" ; } @Autowired private DefaultMQProducer defaultMqProducer ; @GetMapping("/send/msg2") public String sendMsg2 (){ try { // 构建消息主体 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg")); // 构建消息对象 Message message = new Message(); message.setTopic("boot-mq-topic"); message.setTags("boot-mq-tag"); message.setKeys("boot-mq-key"); message.setBody(msgBody.getBytes()); // 发送消息,打印日志 SendResult sendResult = defaultMqProducer.send(message); log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } return "OK" ; } }
2、消息消费
编写消息监听类,实现 RocketMQListener
接口,通过 RocketMQMessageListener
注解控制监听的具体信息;
@Service @RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic") public class ConsumerListener implements RocketMQListener<String> { private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); @Override public void onMessage(String message) { log.info("\n=====\n message:{} \n=====\n",message); } }
五、参考源码
文档仓库: https://gitee.com/cicadasmile/butte-java-note
源码仓库: https://gitee.com/cicadasmile/butte-spring-parent
Gitee主页: https://gitee.com/cicadasmile/butte-java-note
到此这篇关于SpringBoot3集成RocketMq的文章就介绍到这了,更多相关SpringBoot3集成RocketMq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
解决SpringBoot整合MybatisPlus分模块管理遇到的bug
这篇文章主要介绍了解决SpringBoot整合MybatisPlus分模块管理遇到的bug,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-07-07使用springBoot中的info等级通过druid打印sql
这篇文章主要介绍了使用springBoot中的info等级通过druid打印sql,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-09-09带有@Transactional和@Async的循环依赖问题的解决
这篇文章主要介绍了带有@Transactional和@Async的循环依赖问题的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-04-04
最新评论