SpringBoot中连接多个RabbitMQ的方法详解
1. 前 言
在 SpringBoot 中整合单个 RabbitMQ 使用,是很简单的,只需要引入依赖,然后在配置里面配置好 MQ 的连接地址、账号、密码等信息,然后使用即可。
但如果 MQ 的连接地址是多个,那这种连接方式就不奏效了。
前段时间,我开发的一个项目就遇到了这样的问题。那个项目,好几个关联方,每个关联方用的 MQ 的地址都不相同,也就意味着我这边要连接几个 RabbbitMQ 地址。SpringBoot 连接多个 RabbitMQ,怎么搞?
使用默认的连接方式是行不通的,我已经试过,而要实现 SpringBoot 连接多个 RabbitMQ,只能自定义重写一些东西,分别配置才可以,下面一起来走一下试试。
2. 重 写
首先要明确的是,下面的两个类是需要重写的:
- RabbitTemplate:往队列里面丢消息时,需要用到
- RabbitAdmin:声明队列、声明交换机、绑定队列和交换机用到
这里,我定义两个关联方,一个是 one,一个是 two,分别重写与它们的连接工厂。
2.1 重写与关联方one的连接工厂
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @author yuhuofei * @version 1.0 * @description 重写与关联方one的连接工厂 * @date 2022/10/3 16:57 */ @Slf4j @Configuration public class OneMQConfig { @Value("${one.spring.rabbitmq.host}") private String host; @Value("${one.spring.rabbitmq.port}") private int port; @Value("${one.spring.rabbitmq.username}") private String username; @Value("${one.spring.rabbitmq.password}") private String password; @Value("${one.spring.rabbitmq.virtual-host}") private String virtualHost; /** * 定义与one的连接工厂 */ @Bean(name = "oneConnectionFactory") @Primary public ConnectionFactory oneConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "oneRabbitTemplate") @Primary public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory); oneRabbitTemplate.setMandatory(true); oneRabbitTemplate.setConnectionFactory(connectionFactory); oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 确认消息送到交换机(Exchange)回调 * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("确认消息送到交换机(Exchange)结果:"); log.info("相关数据:{}", correlationData); boolean ret = false; if (ack) { log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId()); //下面可自定义业务逻辑处理,如入库保存信息等 } else { log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause); //下面可自定义业务逻辑处理,如入库保存信息等 } } }); oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列 就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发给那个交换机 * @param routingKey 当时这个消息用那个路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //获取消息id String messageId = message.getMessageProperties().getMessageId(); // 内容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息发送失败{}", e); } log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result); //下面可自定义业务逻辑处理,如入库保存信息等 } }); return oneRabbitTemplate; } @Bean(name = "oneFactory") @Primary public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "oneRabbitAdmin") @Primary public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.2 重写与关联方two的连接工厂
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author yuhuofei * @version 1.0 * @description 重写与关联方two的连接工厂 * @date 2022/10/3 17:52 */ @Slf4j @Configuration public class TwoMQConfig { @Value("${two.spring.rabbitmq.host}") private String host; @Value("${two.spring.rabbitmq.port}") private int port; @Value("${two.spring.rabbitmq.username}") private String username; @Value("${two.spring.rabbitmq.password}") private String password; @Value("${two.spring.rabbitmq.virtualHost}") private String virtualHost; /** * 定义与two的连接工厂 */ @Bean(name = "twoConnectionFactory") public ConnectionFactory twoConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "twoRabbitTemplate") public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory); twoRabbitTemplate.setMandatory(true); twoRabbitTemplate.setConnectionFactory(connectionFactory); twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 确认消息送到交换机(Exchange)回调 * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("确认消息送到交换机(Exchange)结果:"); log.info("相关数据:{}", correlationData); boolean ret = false; if (ack) { log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId()); //下面可自定义业务逻辑处理,如入库保存信息等 } else { log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause); //下面可自定义业务逻辑处理,如入库保存信息等 } } }); twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列 就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发给那个交换机 * @param routingKey 当时这个消息用那个路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //获取消息id String messageId = message.getMessageProperties().getMessageId(); // 内容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息发送失败{}", e); } log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result); //下面可自定义业务逻辑处理,如入库保存信息等 } }); return twoRabbitTemplate; } @Bean(name = "twoFactory") public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "twoRabbitAdmin") public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.3 创建队列及交换机并绑定
package com.yuhuofei.mq.config; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 创建队列、交换机并绑定 * @date 2022/10/3 18:15 */ public class QueueConfig { @Resource(name = "oneRabbitAdmin") private RabbitAdmin oneRabbitAdmin; @Resource(name = "twoRabbitAdmin") private RabbitAdmin twoRabbitAdmin; @Value("${one.out.queue}") private String oneOutQueue; @Value("${one.out.queue}") private String oneRoutingKey; @Value("${two.output.queue}") private String twoOutQueue; @Value("${two.output.queue}") private String twoRoutingKey; @Value("${one.topic.exchange.name}") private String oneTopicExchange; @Value("${two.topic.exchange.name}") private String twoTopicExchange; @PostConstruct public void oneRabbitInit() { //声明交换机 oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false)); //声明队列 oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false)); //绑定队列及交换机 oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false)) .to(new TopicExchange(oneTopicExchange, true, false)) .with(oneRoutingKey)); } @PostConstruct public void twoRabbitInit() { //声明交换机 twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false)); //声明队列 twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true)); //绑定队列及交换机 twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false)) .to(new TopicExchange(twoTopicExchange, true, false)) .with(twoRoutingKey)); } }
2.4 配置信息
这里的配置信息,需要与各自的关联方约定好再配置
# 与关联方one的MQ配置 one.spring.rabbitmq.host=one.mq.com one.spring.rabbitmq.port=5672 one.spring.rabbitmq.username=xxxxx one.spring.rabbitmq.password=xxxxx one.spring.rabbitmq.virtual-host=/xxxxx one.out.queue=xxxaa.ssssd.cffs.xxxx one.topic.exchange.name=oneTopExchange # 与关联方two的MQ配置 two.spring.rabbitmq.host=two.mq.com two.spring.rabbitmq.port=5672 two.spring.rabbitmq.username=aaaaaaa two.spring.rabbitmq.password=aaaaaaa two.spring.rabbitmq.virtualHost=/aaaaaaa two.out.queue=ddddd.sssss.hhhhh.eeee two.topic.exchange.name=twoTopExchange
2.5 注意点
在连接多个 MQ 的情况下,需要在某个连接加上 @Primary 注解(见 2.1 中的代码),表示主连接,默认使用这个连接,如果不加,服务会起不来
3. 使 用
3.1 作为消费者
由于在前面的 2.3 中,声明了队列及交换机,并进行了绑定,那么作为消费者,监听相应的队列,获取关联方发送的消息进行处理即可。这里用监听关联方 one 的出队列做展示,two 的类似。
需要注意的地方是,在监听队列时,需要指定 ContainerFactory。
package com.yuhuofei.mq.service; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; /** * @author yuhuofei * @version 1.0 * @description 监听关联方one的消息 * @date 2022/10/3 18:38 */ @Slf4j @Service public class OneReceive { @RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory") public void listenOne(Message message, Channel channel) { //获取MQ返回的数据 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); String data = new String(message.getBody(), StandardCharsets.UTF_8); log.info("MQ返回的数据:{}", data); //下面进行业务逻辑处理 } }
3.2 作为生产者
使用之前重写的 RabbitTemplate ,向各个关联方指定的队列发送消息。
package com.yuhuofei.mq.service; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 向关联方的队列发送消息 * @date 2022/10/3 18:47 */ @Slf4j @Service public class SendMessage { @Resource(name = "oneRabbitTemplate") private RabbitTemplate oneRabbitTemplate; @Resource(name = "twoRabbitTemplate") private RabbitTemplate twoRabbitTemplate; public void sendToOneMessage(String messageId, OneMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } public void sendToTwoMessage(String messageId, TwoMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } }
到此这篇关于SpringBoot中连接多个RabbitMQ的方法详解的文章就介绍到这了,更多相关SpringBoot多个RabbitMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
springboot读取resource配置文件生成容器对象的示例代码
这篇文章主要介绍了springboot读取resource配置文件生成容器对象的示例代码,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2023-07-07SpringBoot之配置logging日志及在控制台中输出过程
这篇文章主要介绍了SpringBoot之配置logging日志及在控制台中输出过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2023-06-06Springboot详解整合SpringSecurity实现全过程
Spring Security基于Spring开发,项目中如果使用Springboot作为基础,配合Spring Security做权限更加方便,而Shiro需要和Spring进行整合开发。因此作为spring全家桶中的Spring Security在java领域很常用2022-07-07如何使用spring ResponseEntity处理http响应
这篇文章主要介绍了如何使用spring ResponseEntity处理http响应的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-07-07
最新评论