Rabbit消息重试机制问题记录

 更新时间:2024年08月09日 10:21:25   作者:陈亦康  
消息重试机制就是在消息处理失败之后重新发送,主要时为了解决消息发送过程可能会出现的问题,例如 网络故障、服务临时不可用 等,这篇文章主要介绍了Rabbit消息重试机制问题记录,需要的朋友可以参考下

消息重试机制

概述

消息重试机制就是在消息处理失败之后重新发送,主要时为了解决消息发送过程可能会出现的问题,例如 网络故障、服务临时不可用 等.

Ps:如果时程序逻辑引起的错误,那么即使重试多少次都是没有用的,但是可以通过配置重试次数来解决.

实现方式一:基于消息手动确认机制,返回 nack 实现

配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual # 手动确认

交换机、队列、绑定

    @Bean("ackExchange")
    fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)
    @Bean("ackQueue")
    fun ackQueue() = Queue(MQConst.ACK_QUEUE)
    @Bean
    fun ackBinding(
        @Qualifier("ackExchange") exchange: DirectExchange,
        @Qualifier("ackQueue") queue: Queue,
    ): Binding {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.ACK_BINDING)
    }

生产者接口

@RestController
@RequestMapping("/mq3")
class MQ3Api(
   val rabbitTemplate: RabbitTemplate
) {
    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }
}

消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        val deliveryTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
            val a = 1 / 0
            channel.basicAck(deliveryTag, false)
        } catch (e: Exception) {
            //通过返回 nack,并设置 requeue 为 ture 实现消息重新入队,并进行重试
            channel.basicNack(deliveryTag, false, true) 
        }
    }
}

演示和结论

deliverTag 自增的原因: 引发异常后,会返回 nack,并且参数 requeue = true,表示重新入队,然后进行重试,将队列中的消息再次发送给生产者,因此 deliverTag 会自增.

缺点: 如果是由于程序逻辑异常引起的重试,那么无论重试多少次都没用,并且不断重试会导致负载飙升,性能下降.

实现方式二:基于重试配置实现 配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: auto # 开启重试机制,这里必须是 auto,否则不生效!
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 失败等待时常
          max-attempts: 5 # 最大重试次数(包括第一次消费)

Ps:开启重试机制,acknowledge-mode 必须指定为 auto,否则不生效!

交换机、队列、绑定

    @Bean("ackExchange")
    fun ackExchange() = DirectExchange(MQConst.ACK_EXCHANGE)
    @Bean("ackQueue")
    fun ackQueue() = Queue(MQConst.ACK_QUEUE)
    @Bean
    fun ackBinding(
        @Qualifier("ackExchange") exchange: DirectExchange,
        @Qualifier("ackQueue") queue: Queue,
    ): Binding {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with(MQConst.ACK_BINDING)
    }

生产者接口

    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }

消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
        val a = 1 / 0
    }
}

演示和结论

deliverTag 不自增的原因: 因为是消息已经发出去了,即使失败了也不会重回队列,而是直接重新发一遍消息.

好处: 不仅可以控制重试次数(防止类似于上面讲到的确认应答引起的无限重试),还可以控制每次重试的间隔时间(防止负载飙升).

到此这篇关于Rabbit高级特性 - 消息重试机制的文章就介绍到这了,更多相关Rabbit消息重试机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot接收各种各样参数的示例详解

    SpringBoot接收各种各样参数的示例详解

    参数映射准确来说是springmvc来帮我们干的活,但是由于springboot太过火爆,简化了springmvc相关配置文件,以至于很多人会误认为是springboot的功能,本文将给大家介绍SpringBoot接收各种各样参数,文中有详细的代码讲解,需要的朋友可以参考下
    2024-04-04
  • 实战指南:Java编写Flink SQL解决难题

    实战指南:Java编写Flink SQL解决难题

    想知道如何利用Java编写Flink SQL解决难题吗?本指南将为您揭示最实用的技巧和策略,让您轻松应对挑战,跟着我们一起探索,让Java和Flink SQL成为您问题解决的得力助手!
    2023-12-12
  • Java中如何将符号分隔的文本文件txt转换为excel

    Java中如何将符号分隔的文本文件txt转换为excel

    这篇文章主要介绍了Java中如何将符号分隔的文本文件txt转换为excel,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-09-09
  • Spring boot中使用ElasticSearch的方法详解

    Spring boot中使用ElasticSearch的方法详解

    这篇文章主要给大家介绍了关于Spring boot中使用ElasticSearch的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-01-01
  • Java实现DES加解密算法解析

    Java实现DES加解密算法解析

    这篇文章主要介绍了Java实现DES加解密算法解析,结合完整实例形式分析了DES加密的相关原理,需要的朋友可以参考下。
    2016-10-10
  • java.net.SocketTimeoutException: Read timed out异常的解决

    java.net.SocketTimeoutException: Read timed o

    本文主要介绍了java.net.SocketTimeoutException: Read timed out异常的解决,可能是因为网络延迟、服务器响应慢或连接不稳定等原因造成的,下面就一起来介绍一下,感兴趣的可以了解一下
    2024-05-05
  • SpringBoot整合SSO(single sign on)单点登录

    SpringBoot整合SSO(single sign on)单点登录

    这篇文章主要介绍了SpringBoot整合SSO(single sign on)单点登录,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • spring boot 使用 @Scheduled 注解和 TaskScheduler 接口实现定时任务

    spring boot 使用 @Scheduled 注解和 TaskScheduler 接口实现定时任务

    这篇文章主要介绍了spring boot 使用 @Scheduled 注解和 TaskScheduler 接口实现定时任务,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-06-06
  • IDEA使用JDBC安装配置jar包连接MySQL数据库

    IDEA使用JDBC安装配置jar包连接MySQL数据库

    这篇文章介绍了IDEA使用JDBC安装配置jar包连接MySQL数据库的方法,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-01-01
  • Java中锁的分类与使用方法

    Java中锁的分类与使用方法

    这篇文章主要给大家介绍了关于Java中锁分类与使用的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04

最新评论