关于SpringBoot整合RabbitMQ实现死信队列
概念介绍
什么是死信
死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:
- 消费者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。
- 消息到达生存时间还未被消费。
- 队列超过长度限制,消息被丢弃。
这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图
死信队列应用
- 作为消息可靠性的一个扩展。比如,在队列已满的情况下也不会丢失消息。
- 可以实现延迟消费功能。比如,订单15分钟内未支付。
注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:
- 维护多个队列,每个队列维护一个TTL时间。
- 使用延迟交换机。这种方式需要下载插件支持
工程搭建
环境说明
- RabbitMQ环境
- Java版本:JDK1.8
- Maven版本:apache-maven-3.6.3
- 开发工具:IntelliJ IDEA
搭建步骤
1.创建SpringBoot项目。
2.pom.xml文件导入RabbitMQ依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.application.yml文件添加RabbitMQ配置。
spring: # rabbitmq配置信息 RabbitProperties类 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / # 开启confirm机制 publisher-confirm-type: correlated # 开启return机制 publisher-returns: true #全局配置,局部配置存在就以局部为准 listener: simple: acknowledge-mode: manual # 手动ACK
实现死信
准备Exchange&Queue
@Configuration public class RabbitMQConfig { /** * 正常队列 */ public static final String EXCHANGE = "boot-exchange"; public static final String QUEUE = "boot-queue"; public static final String ROUTING_KEY = "boot-rout"; /** * 死信队列 */ public static final String DEAD_EXCHANGE = "dead-exchange"; public static final String DEAD_QUEUE = "dead-queue"; public static final String DEAD_ROUTING_KEY = "dead-rout"; /** * 声明死信交换机 * * @return */ @Bean public Exchange deadExchange() { return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build(); } /** * 声明死信队列 * * @return */ @Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } /** * 绑定死信的队列和交换机 * * @param deadExchange * @param deadQueue * @return */ @Bean public Binding deadBind(Exchange deadExchange, Queue deadQueue) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } /** * 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); * * @return */ @Bean public Exchange bootExchange() { return ExchangeBuilder.directExchange(EXCHANGE).build(); } /** * 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null); * 绑定死信交换机及路由key * * @return */ @Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //声明队列属性有更改时需要删除队列 //给队列设置消息时长 //.ttl(10000) //队列最大长度 .maxLength(1) .build(); } /** * 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY); * * @param bootExchange * @param bootQueue * @return */ @Bean public Binding bootBind(Exchange bootExchange, Queue bootQueue) { return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs(); } }
监听死信队列
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) public void listener_dead(String msg, Channel channel, Message message) throws IOException { System.out.println("死信接收到消息" + msg); System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId()); System.out.println("messageID:" + message.getMessageProperties().getMessageId()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
方式一、消费者拒绝&否认
- 拒绝消息
@RabbitListener(queues = RabbitMQConfig.QUEUE) public void listener(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息" + msg); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false) }
- 否认消息
@RabbitListener(queues = RabbitMQConfig.QUEUE) public void listener(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息" + msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); }
方式二、超过消息TTL 发送消息时设置TTL
@SpringBootTest public class Publisher { @Autowired private RabbitTemplate template; /** * 5秒未被消费会路由到死信队列 */ @Test public void publish_expir() { template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> { message.getMessageProperties().setExpiration("5000"); return message; }); } }
- 设置队列所有消息的TTL
更新RabbitMQConfig类中bootQueue() ,更新后需要删除队列,因为队列属性有更改。
@Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //声明队列属性有更改时需要删除队列 //给队列设置消息时长 .ttl(10000) .build(); }
方式三、超过队列长度限制
设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。
@Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) //声明队列属性有更改时需要删除队列 .maxLength(1) .build(); }
代码仓库 点我
到此这篇关于关于SpringBoot整合RabbitMQ实现死信队列的文章就介绍到这了,更多相关RabbitMQ实现死信队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
springboot中rabbitmq实现消息可靠性机制详解
这篇文章主要介绍了springboot中rabbitmq实现消息可靠性机制详解,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下2021-09-09
最新评论