SpringBoot整合RabbitMQ消息队列的完整步骤
SpringBoot整合RabbitMQ
主要实现RabbitMQ以下三种消息队列:
- 简单消息队列(演示direct模式)
- 基于RabbitMQ特性的延时消息队列
- 基于RabbitMQ相关插件的延时消息队列
公共资源
1. 引入pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 配置yml文件
基于上篇《RabbitMQ安装与配置》实现的情况下,进行基础配置。
spring: rabbitmq: host: 121.5.168.31 port: 5672 # 默认可省略 virtual-host: /*** # 虚拟主机 username: *** # 用户名 password: *** # 用户密码 # 开启投递成功回调 P -> Exchange publisher-confirm-type: correlated # 开启投递消息到队列失败回调 Exchange -> Queue publisher-returns: true # 开启手动ACK确认模式 Queue -> C listener: simple: acknowledge-mode: manual # 代表手动ACK确认 # 一些基本参数的设置 concurrency: 3 prefetch: 15 retry: enabled: true max-attempts: 5 max-concurrency: 10
3. 公共Constants类
/** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/23 15:28 */ public class Constants { /** * 第一个配置Queue,Exchange,Key(非注解方式) */ public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE"; public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE"; public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY"; /** * 第二个配置Queue,Exchange,Key(注解方式) */ public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE"; public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE"; public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY"; //************************************延时消息队列配置信息************************** /** * 延时队列信息配置 */ public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE"; public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE"; public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY"; /** * 死信队列 */ public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE"; public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE"; public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY"; //**************************************延时消息队列配置信息(插件版)****************************** /** * 新延时队列信息配置 */ public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE"; public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE"; public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY"; }
简单消息队列(direct模式)
4. RabbitTemplate模板配置
主要定义消息投递Exchange成功回调函数和消息从Exchange投递到消息队列失败的回调函数。
package com.topsun.rabbit; import com.sun.org.apache.xpath.internal.operations.Bool; import com.topsun.constants.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/23 14:17 */ @Configuration public class RabbitConfig { private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Autowired private CachingConnectionFactory connectionFactory; /** * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 触发setReturnCallback回调必须设置mandatory=true,否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(Boolean.TRUE); // 设置序列化机制 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息由投递到Exchange中时触发的回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> logger.info("消息发送到Exchange情况反馈:唯一标识:correlationData={},消息确认:ack={},原因:cause={}", correlationData, ack, cause) ); // 消息由Exchange发送到Queue时失败触发的回调 rabbitTemplate.setReturnsCallback((returnedMessage) -> { // 如果是插件形式实现的延时队列,则直接返回 // 原因: 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列,从而实现延时队列的操作 if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) { return; } logger.warn("消息由Exchange发送到Queue时失败:message={},replyCode={},replyText={},exchange={},rountingKey={}", returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); }); return rabbitTemplate; } //*******************************************直接配置绑定关系***************************************** /** * 声明队列 * * @return */ @Bean public Queue horseQueue() { return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE); } /** * 声明指定模式交换机 * * @return */ @Bean public DirectExchange horseExchange() { return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE); } /** * 绑定交换机,队列,路由Key * * @return */ @Bean public Binding horseBinding() { return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY); } }
5. 定义消息监听器
基于 @RabbitListenerzi注解,实现自定义消息监听器。主要有两种实现方式:
- 如果在配置类中声明了Queue、Excehange以及他们直接的绑定,这里直接指定队列进行消息监听
- 如果前面什么也没做,这里可以直接用注解的方式进行绑定实现消息监听
package com.topsun.rabbit; import com.rabbitmq.client.Channel; import com.topsun.constants.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/23 14:58 */ @Component public class MsgListener { private static Logger logger = LoggerFactory.getLogger(MsgListener.class); /** * 配置类中已经完成绑定,这里直接根据队列值接收 * * @param message * @param channel * @param msg */ @RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE) public void customListener(Message message, Channel channel, String msg) { // 获取每条消息唯一标识(用于手动ACK确认) long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> customListener接收" + msg); // 手动ACK确认 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失败: {}", tag); } } /** * 根据注解的形式进行绑定接收 * * @param message * @param channel * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"), exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"), key = {Constants.HORSE_ANNOTATION_KEY} )) public void annotationListener(Message message, Channel channel, String msg) { // 获取每条消息唯一标识(用于手动ACK确认) long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> annotationListener接收" + msg); // 手动ACK确认 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失败: {}", tag); } } }
6. 测试接口
这里发送100条消息:
- 奇数条到非注解方式的消息监听器
- 偶数条到注解式消息监听器
@GetMapping("/rabbit") public void sendMsg() { for (int i = 1; i <= 100; i++) { String msg = "第" + i + "条消息"; logger.info("==> 发送" + msg); if (i % 2 == 1) { rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i))); } else { rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i))); } } }
结果:自行测试过,非常成功:smile::smile::smile:
延时消息队列
原理:生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
7. 配置绑定相关信息
/** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/24 14:22 */ @Configuration public class DelayRabbitConfig { private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class); /** * 声明延时队列交换机 * * @return */ @Bean public DirectExchange delayExchange() { return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE); } /** * 声明死信队列交换机 * * @return */ @Bean public DirectExchange deadExchange() { return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE); } /** * 声明延时队列 延时10s(单位:ms),并将延时队列绑定到对应的死信交换机和路由Key * * @return */ @Bean public Queue delayQueue() { Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY); // x-message-ttl 声明队列的TTL(过期时间) // 可以在这里直接写死,也可以进行动态的设置(推荐动态设置) // args.put("x-message-ttl", 10000); return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build(); } /** * 声明死信队列 * * @return */ @Bean public Queue deadQueue() { return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE); } /** * 延时队列绑定管理 * * @return */ @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY); } /** * 死信队列绑定管理 * * @return */ @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY); } //**********************************延时消息队列配置信息(插件版)************************************ @Bean public Queue pluginQueue() { return new Queue(Constants.HORSE_PLUGIN_QUEUE); } /** * 设置延时队列的交换机,必须是 CustomExchange 类型交换机 * 参数必须,不能改变 * @return */ @Bean public CustomExchange customPluginExchange() { Map<String, Object> args = new HashMap<>(2); args.put("x-delayed-type", "direct"); return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args); } @Bean public Binding pluginBinding() { return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs(); } }
8. 定义延时监听器
/** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/24 14:51 */ @Component public class DelayMsgListener { private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class); /** * 监听死信队列 * * @param message * @param channel * @param msg */ @RabbitListener(queues = Constants.HORSE_DEAD_QUEUE) public void consumeDeadListener(Message message, Channel channel, String msg) { long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> consumeDeadListener接收" + msg); // 手动ACK确认 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失败: {}", tag); } } /** * 监听延时队列(插件版) * * @param message * @param channel * @param msg */ @RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE) public void consumePluginListener(Message message, Channel channel, String msg) { long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> consumePluginListener" + msg); // 手动ACK确认 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失败: {}", tag); } } }
9. 测试接口
// 基于特性的延时队列 @GetMapping("/delay/rabbit") public void delayMsg(@RequestParam("expire") Long expire) { for (int i = 1; i <= 10; i++) { String msg = "第" + i + "条消息"; logger.info("==> 发送" + msg); // 这里可以动态的设置过期时间 rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg, message -> { message.getMessageProperties().setExpiration(String.valueOf(expire)); return message; }, new CorrelationData(String.valueOf(i))); } } // 基于插件的延时队列 @GetMapping("/delay/plugin") public void delayPluginMsg(@RequestParam("expire") Integer expire) { for (int i = 1; i <= 10; i++) { String msg = "第" + i + "条消息"; logger.info("==> 发送" + msg); // 动态设置过期时间 rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(expire); return message; }, new CorrelationData(String.valueOf(i))); } }
结果:你懂的:scream_cat::scream_cat::scream_cat:
RabbitMQ的基础使用演示到此结束。
总结
到此这篇关于SpringBoot整合RabbitMQ消息队列的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Springboot整合mybatisplus时,使用条件构造器排序报错问题及解决
这篇文章主要介绍了Springboot整合mybatisplus时,使用条件构造器排序报错问题及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-04-04Springboot2.x 使用 Log4j2 异步打印日志的实现
这篇文章主要介绍了Springboot2.x 使用 Log4j2 异步打印日志的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-12-12SpringBoot+Mybatis-Plus实现mysql读写分离方案的示例代码
这篇文章主要介绍了SpringBoot+Mybatis-Plus实现mysql读写分离方案的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-03-03
最新评论