Rabbit消息重试机制问题记录
消息重试机制
概述
消息重试机制就是在消息处理失败之后重新发送,主要时为了解决消息发送过程可能会出现的问题,例如 网络故障、服务临时不可用 等.
Ps:如果时程序逻辑引起的错误,那么即使重试多少次都是没有用的,但是可以通过配置重试次数来解决.
实现方式一:基于消息手动确认机制,返回 nack 实现
配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: simple: acknowledge-mode: manual # 手动确认
交换机、队列、绑定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") queue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生产者接口
@RestController @RequestMapping("/mq3") class MQ3Api( val rabbitTemplate: RabbitTemplate ) { @RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" } }
消费者
import com.cyk.rabbitmq.constants.MQConst import com.rabbitmq.client.Channel //注意这里的依赖 import org.springframework.amqp.core.Message //注意这里的依赖 import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import java.nio.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: Channel, ) { val deliveryTag = message.messageProperties.deliveryTag try { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag") val a = 1 / 0 channel.basicAck(deliveryTag, false) } catch (e: Exception) { //通过返回 nack,并设置 requeue 为 ture 实现消息重新入队,并进行重试 channel.basicNack(deliveryTag, false, true) } } }
演示和结论
deliverTag 自增的原因: 引发异常后,会返回 nack,并且参数 requeue = true,表示重新入队,然后进行重试,将队列中的消息再次发送给生产者,因此 deliverTag 会自增.
缺点: 如果是由于程序逻辑异常引起的重试
,那么无论重试多少次都没用,并且不断重试会导致负载飙升,性能下降
.
实现方式二:基于重试配置实现 配置文件
spring: application: name: rabbitmq rabbitmq: host: env-base port: 5672 username: root password: 1111 listener: simple: acknowledge-mode: auto # 开启重试机制,这里必须是 auto,否则不生效! retry: enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 失败等待时常 max-attempts: 5 # 最大重试次数(包括第一次消费)
Ps:开启重试机制,acknowledge-mode 必须指定为 auto,否则不生效!
交换机、队列、绑定
@Bean("ackExchange") fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE) @Bean("ackQueue") fun ackQueue() = Queue(MQConst.ACK_QUEUE) @Bean fun ackBinding( @Qualifier("ackExchange") exchange: DirectExchange, @Qualifier("ackQueue") queue: Queue, ): Binding { return BindingBuilder .bind(queue) .to(exchange) .with(MQConst.ACK_BINDING) }
生产者接口
@RequestMapping("/ack") fun ack(): String { rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1") return "ok" }
消费者
import com.cyk.rabbitmq.constants.MQConst import com.rabbitmq.client.Channel //注意这里的依赖 import org.springframework.amqp.core.Message //注意这里的依赖 import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.stereotype.Component import java.nio.charset.Charset @Component class AckListener { @RabbitListener(queues = [MQConst.ACK_QUEUE]) fun handMessage( message: Message, channel: Channel, ) { println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}") val a = 1 / 0 } }
演示和结论
deliverTag 不自增的原因: 因为是消息已经发出去了,即使失败了也不会重回队列,而是直接重新发一遍消息.
好处: 不仅可以控制重试次数(防止类似于上面讲到的确认应答引起的无限重试),还可以控制每次重试的间隔时间(防止负载飙升).
到此这篇关于Rabbit高级特性 - 消息重试机制的文章就介绍到这了,更多相关Rabbit消息重试机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Spring boot中使用ElasticSearch的方法详解
这篇文章主要给大家介绍了关于Spring boot中使用ElasticSearch的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2019-01-01java.net.SocketTimeoutException: Read timed o
本文主要介绍了java.net.SocketTimeoutException: Read timed out异常的解决,可能是因为网络延迟、服务器响应慢或连接不稳定等原因造成的,下面就一起来介绍一下,感兴趣的可以了解一下2024-05-05SpringBoot整合SSO(single sign on)单点登录
这篇文章主要介绍了SpringBoot整合SSO(single sign on)单点登录,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-06-06spring boot 使用 @Scheduled 注解和 TaskScheduler 接口实现定时任务
这篇文章主要介绍了spring boot 使用 @Scheduled 注解和 TaskScheduler 接口实现定时任务,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2023-06-06
最新评论