一文总结RabbitMQ中的消息确认机制

 更新时间:2023年06月20日 11:34:12   作者:土豆鱼_  
RabbitMQ消息确认机制指的是在消息传递过程中,发送方发送消息后,接收方需要对消息进行确认,以确保消息被正确地接收和处理,本文为大家整理了RabbitMQ中的消息确认机制,需要的可以参考一下

RabbitMQ的消息确认机制

RabbitMQ消息确认机制指的是在消息传递过程中,发送方发送消息后,接收方需要对消息进行确认,以确保消息被正确地接收和处理。RabbitMQ的消息确认机制分为两种:

  • 生产者确认机制:生产者发送消息后,需要等待RabbitMQ服务器的确认消息,以确保消息已经被成功地发送到RabbitMQ服务器。如果RabbitMQ服务器没有收到消息或者消息发送失败,生产者会收到一个确认消息,从而可以进行重发或者其他处理。
  • 消费者确认机制:消费者接收到消息后,需要向RabbitMQ服务器发送确认消息,以告诉服务器已经成功地接收并处理了该消息。如果消费者没有发送确认消息,RabbitMQ服务器会认为该消息没有被正确地处理,从而会将该消息重新发送给其他消费者进行处理。

在RabbitMQ中,消息确认机制是通过ACK机制来实现的。ACK代表Acknowledgement,即确认消息。当消息发送方发送消息后,接收方需要向消息发送方发送ACK消息,以表示已经成功地接收和处理了该消息。如果消息发送方没有收到ACK消息,就会认为该消息没有被正确地处理,从而进行重发或者其他处理。

总之,RabbitMQ的消息确认机制可以保证消息的可靠性,从而提高系统的稳定性和可靠性。

消息可靠抵达-ConfirmCallback

RabbitMQ的消息确认机制确保了消息的可靠抵达,其中ConfirmCallback是其中一种实现方式

ConfirmCallback是一个回调函数,用于在消息被确认时进行回调,以确保消息已经被正确地发送到RabbitMQ Broker并被处理。当生产者发送消息时,可以通过调用channel的confirmSelect()方法将channel设置为confirm模式,然后通过添加ConfirmCallback回调函数来处理消息确认。

当消息被发送到Broker后,如果Broker成功地将消息路由到目标队列,则会调用ConfirmCallback回调函数的handleAck()方法,表示消息已被确认。如果Broker无法将消息路由到目标队列,则会调用handleNack()方法,表示消息未被确认。

使用ConfirmCallback可以确保消息已经被正确地发送到RabbitMQ Broker并被处理,从而避免了消息丢失或重复发送的情况。同时,ConfirmCallback还可以在消息未被确认时进行重试或记录日志等操作,以确保消息的可靠性和稳定性。

ConfirmCallback使用说明: 在配置文件中配置:spring.rabbitmq.publisher-confirms=true 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。 CorrelationData:用来表示当前消息唯一性。 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递 到目标 queue 里。所以需要用到接下来的 returnCallback 。

消息可靠抵达-ReturnCallback

RabbitMQ的ReturnCallback机制是为了解决消息无法路由到指定队列的问题。当发送的消息无法被路由到指定队列时,RabbitMQ会将消息返回给生产者,这时候如果生产者设置了ReturnCallback回调函数,就可以在回调函数中处理这种情况。

ReturnCallback机制的使用场景一般是在消息发送时,指定了mandatory参数为true,表示如果消息无法被路由到指定队列,则将消息返回给生产者。如果mandatory参数为false,则消息会被直接丢弃。

当生产者设置了ReturnCallback回调函数后,RabbitMQ在将消息返回给生产者时,会触发该回调函数。在ReturnCallback回调函数中,可以处理消息无法路由的情况,例如重发消息、记录日志等。

下面是一个使用ReturnCallback机制的示例代码:

channel.addReturnListener(new ReturnCallback() {
    @Override
    public void handle(ReturnedMessage returnedMessage) {
        String message = new String(returnedMessage.getBody());
        System.out.println("Message returned: " + message);
    }
});
channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());

在上面的代码中,我们通过addReturnListener方法设置了ReturnCallback回调函数,当消息无法路由到指定队列时,会触发该回调函数。在回调函数中,我们将返回的消息打印出来,以便处理。

需要注意的是,ReturnCallback机制只有在消息被发送到交换机后,才会触发。如果消息发送的交换机不存在,或者路由键不符合任何绑定规则,消息会被直接丢弃,不会触发ReturnCallback回调函数

在配置文件中配置:

spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。

这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据

RabbitMQ自动确认和手动确认

RabbitMQ消息确认机制是一种保证消息可靠抵达的机制。在RabbitMQ中,消息确认机制分为两种模式:自动确认模式和手动确认模式

在自动确认模式下,当消费者收到消息并将其处理完毕后,RabbitMQ会自动将该消息标记为已确认,然后将其从队列中删除。这种模式比较简单,但是存在消息丢失的风险,因为如果消费者在处理消息的过程中出现异常,消息就会被丢失

在手动确认模式下,当消费者收到消息并将其处理完毕后,需要向RabbitMQ发送一个确认消息,告诉RabbitMQ该消息已经被处理完毕,可以从队列中删除。如果消费者在处理消息的过程中出现异常,可以选择不发送确认消息,这样消息就不会被从队列中删除,可以重新被其他消费者获取

手动确认模式可以保证消息不会被丢失,但是需要消费者编写额外的代码来处理确认消息的发送。此外,手动确认模式还可以设置确认模式为批量确认模式,即一次性确认多个消息,可以提高消息处理的效率

总结:

1.自动ACK模式

在自动ACK模式下,消息一旦被消费者接收到,就会自动被确认。这种模式下,消息一旦被发送到消费者,就会从队列中删除,无论消费者是否已经成功消费该消息。

2.手动ACK模式

在手动ACK模式下,消费者必须手动发送ACK消息来确认消息已经被成功消费。如果消费者没有发送ACK消息,RabbitMQ服务器就会认为该消息还没有被消费,会将该消息重新发送给其他消费者。这种模式下,消费者可以选择性地确认消息,只有当消费者成功消费一条消息并确认后,该消息才会从队列中删除。

手动ACK模式可以保证消息的可靠性,但是需要消费者在处理消息时进行额外的处理,消费者需要在处理消息后发送ACK消息,否则消息会被重新发送。

在 RabbitMQ 中,消费者可以通过设置 channel.basicAck(deliveryTag, multiple) 方法来发送 ack 消息。其中,deliveryTag 表示消息的唯一标识符,multiple 表示是否批量确认。如果 multiple 为 true,表示要确认该 deliveryTag 及其之前的所有消息;如果 multiple 为 false,表示只确认该 deliveryTag 指定的一条消息。

需要注意的是,如果消费者在处理消息时发生了异常,消息并没有被成功处理,那么不应该发送 ack 消息,而应该将消息重新放回队列中,让其他消费者来处理。

RabbitMQ处理消息方法

RabbitMQ提供了一些基本的方法来处理消息,这些方法包括:

  • basic.publish: 发布消息到指定的交换机上。
  • basic.consume: 消费消息,启动一个消费者来监听指定队列上的消息。
  • basic.ack: 确认消息已经被消费,告诉RabbitMQ可以删除该消息。
  • basic.nack: 否认消息已经被消费,告诉RabbitMQ需要重新发送该消息。
  • basic.reject: 拒绝消息,告诉RabbitMQ不需要再次发送该消息。
  • basic.get: 获取指定队列上的一条消息。
  • basic.cancel: 取消消费者的消费,停止监听指定队列上的消息。

这些方法都是基于AMQP协议定义的,可以使用RabbitMQ提供的客户端库或者自己实现AMQP协议来调用这些方法。

到此这篇关于一文总结RabbitMQ中的消息确认机制的文章就介绍到这了,更多相关RabbitMQ消息确认机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java将本地项目部署到Linux服务器的实践

    Java将本地项目部署到Linux服务器的实践

    本文主要介绍了Java将本地项目部署到Linux服务器的实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧<BR>
    2022-06-06
  • Spring Boot非Web项目运行配置的方法教程

    Spring Boot非Web项目运行配置的方法教程

    这篇文章主要介绍了Spring Boot非Web项目运行配置的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-09-09
  • SpringBoot2.0解决Long型数据转换成json格式时丢失精度问题

    SpringBoot2.0解决Long型数据转换成json格式时丢失精度问题

    这篇文章主要介绍了SpringBoot2.0解决Long型数据转换成json格式时丢失精度问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • springboot中请求路径配置在配置文件中详解

    springboot中请求路径配置在配置文件中详解

    这篇文章主要介绍了springboot中请求路径配置在配置文件中,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • Java8实现优雅的获取重复字符与次数

    Java8实现优雅的获取重复字符与次数

    这篇文章主要为大家详细介绍了在Java8中,我们应该如何进行字符及字符串的去重,以及计算去重次数,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下
    2023-12-12
  • SpringBoot实现日志链路追踪的项目实践

    SpringBoot实现日志链路追踪的项目实践

    在分布式系统中,由于请求的处理过程可能会跨越多个服务,因此,对请求的追踪变得尤为重要,本文主要介绍了SpringBoot实现日志链路追踪的项目实践,感兴趣的可以了解一下
    2024-03-03
  • Hadoop源码分析二安装配置过程详解

    Hadoop源码分析二安装配置过程详解

    本篇是Hadoop源码分析系列文章第二篇,主要介绍Hadoop安装配置的详细过程,后续本系列文章会持续更新,有需要的朋友可以借鉴参考下
    2021-09-09
  • 引入SpringCloud-gateway报错的解决方案

    引入SpringCloud-gateway报错的解决方案

    这篇文章主要介绍了引入SpringCloud-gateway报错的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Spring Data JPA 映射VO/DTO对象方式

    Spring Data JPA 映射VO/DTO对象方式

    这篇文章主要介绍了Spring Data JPA 映射VO/DTO对象方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • java对接微信小程序详细流程(登录&获取用户信息)

    java对接微信小程序详细流程(登录&获取用户信息)

    这篇文章主要给大家介绍了关于java对接微信小程序(登录&获取用户信息)的相关资料,我们在开发微信小程序时经常需要获取用户微信用户名以及头像信息,微信提供了专门的接口API用于返回这些信息,需要的朋友可以参考下
    2023-08-08

最新评论