SpringBoot+RabbitMQ实现消息可靠传输详解

 更新时间:2022年05月25日 08:35:44   作者:小码code  
消息的可靠传输是面试必问的问题之一,保证消息的可靠传输主要在生产端开启 comfirm 模式,RabbitMQ 开启持久化,消费端关闭自动 ack 模式。本文将详解SpringBoot整合RabbitMQ如何实现消息可靠传输,需要的可以参考一下

环境配置

SpringBoot 整合 RabbitMQ 实现消息的发送。

1.添加 maven 依赖

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.添加 application.yml 配置文件

spring:
  rabbitmq:
    host: 192.168.3.19
    port: 5672
    username: admin
    password: xxxx

3.配置交换机、队列以及绑定

    @Bean
    public DirectExchange myExchange() {
        DirectExchange directExchange = new DirectExchange("myExchange");
        return directExchange;
    }

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue");
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
    }

4.生产发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(String message) {
        rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
        System.out.println("【发送消息】" + message)
        return "【send message】" + message;
    }

5.消费者接收消息

    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收信息】" + msg + " 当前时间" + time);

6.调用生产端发送消息 hello,控制台输出:

【发送消息】hello
【接收信息】hello 当前时间2022-05-12 10:21:14

说明消息已经被成功接收。

消息丢失分析

一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

  • 生产端丢失: 生产者无法传输到 RabbitMQ
  • 存储端丢失: RabbitMQ 存储自身挂了
  • 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费

RabbitMQ 从生产端、储存端、消费端都对可靠性传输做很好的支持。

生产阶段

生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

配置application.yml

spring:
  rabbitmq:
    # 消息确认机制 生产者 -> 交换机
    publisher-confirms: true
    # 消息返回机制  交换机 -> 队列
    publisher-returns: true

配置

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("【correlationData】:" + correlationData);
                log.info("【ack】" + ack);
                log.info("【cause】" + cause);
                if (ack) {
                    log.info("【发送成功】");
                } else {
                    log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
                }
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.warn("【消息发送失败】");
                log.info("【message】" + message);
                log.info("【replyCode】" + replyCode);
            }
        });

        return rabbitTemplate;
    }
}

消息从 生产者 到 交换机, 有confirmCallback 确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据 ack 判断消息是否发送成功。

消息从 交换机 到 队列,有returnCallback 退回模式。

发送消息 product message 控制台输出如下:

【发送消息】product message
【接收信息】product message 当前时间2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【发送成功】

生产端模拟消息丢失

这里有两个方案:

  • 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。
  • 发送不存在的交换机:
// myExchange 修改成 myExchangexxxxx
rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

结果:

【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【发送失败】

当发送失败可以对消息进行重试

交换机正确,发送不存在的队列:

交换机接收到消息,返回成功通知,控制台输出:

【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【发送成功】

交换机没有找到队列,返回失败信息:

【消息发送失败】
【message】product message
【replyCode】312

RabbitMQ

开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里

修改队列的持久化,修改成非持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",false);
        return queue;
    }

发送消息之后,消息存放在队列中,然后重启 RabbitMQ,消息不存在了。
设置队列持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",true);
        return queue;
    }

重启之后,队列的消息还存在。

消费端

消费端默认开始 ack 自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:

修改application.yml 文件

spring:
  rabbitmq:
    # 手动消息确认
    listener:
      simple:
        acknowledge-mode: manual

消费接收消息之后需要手动确认:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收信息】" + msg + " 当前时间" + time);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

如果不添加:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

发送两条消息

消息被接收后,没有确认,重新放到队列中:

重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。

加上 channel.basicAck 之后,再重启项目

队列消息就被删除了

basicAck 方法最后一个参数 multiple 表示是删除之前的队列。

multiple 设置成 true,把后面的队列都清理掉了

到此这篇关于SpringBoot+RabbitMQ实现消息可靠传输详解的文章就介绍到这了,更多相关SpringBoot RabbitMQ消息可靠传输内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 使用java代码代替xml实现SSM教程

    使用java代码代替xml实现SSM教程

    这篇文章主要介绍了使用java代码代替xml实现SSM教程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • Java常用工具类汇总 附示例代码

    Java常用工具类汇总 附示例代码

    这篇文章主要介绍了Java常用工具类汇总 附示例代码,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • 全面解析Java中的注解与注释

    全面解析Java中的注解与注释

    这篇文章主要介绍了Java中的注解与注释,简单来说注解以@符号开头而注释被包含在/***/符号中,各自具体的作用则来看本文详解,需要的朋友可以参考下
    2016-05-05
  • 详解SpringBoot中的index首页的访问、自定义Favicon图标

    详解SpringBoot中的index首页的访问、自定义Favicon图标

    这篇文章主要介绍了SpringBoot中的index首页的访问、自定义Favicon图标,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-08-08
  • Java注解之Retention、Documented、Inherited介绍

    Java注解之Retention、Documented、Inherited介绍

    这篇文章主要介绍了Java注解之Retention、Documented、Inherited注解介绍,本文内容和相关文章是系列文章,需要的朋友可以参考下
    2014-09-09
  • IDEA集成JProfiler的图文详解

    IDEA集成JProfiler的图文详解

    本文详细介绍了JProfiler的下载、安装和使用过程,首先需要在官网下载对应操作系统的安装包并进行安装,然后填写个人信息进行注册并获取许可证密钥,感兴趣的朋友一起看看吧
    2024-10-10
  • Spring Boot定时器创建及使用解析

    Spring Boot定时器创建及使用解析

    这篇文章主要介绍了Spring Boot定时器创建及使用解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • java环境变量配置和adb的配置教程详解

    java环境变量配置和adb的配置教程详解

    这篇文章主要介绍了java环境变量配置和adb的配置教程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-05-05
  • IDEA连接mysql保姆级教学教程

    IDEA连接mysql保姆级教学教程

    学习使用IDEA的时候,需要连接Database,下面这篇文章主要给大家介绍了关于IDEA连接mysql的保姆级教学教程,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-03-03
  • Springboot如何同时装配两个相同类型数据库

    Springboot如何同时装配两个相同类型数据库

    这篇文章主要介绍了Springboot如何同时装配两个相同类型数据库,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11

最新评论