springboot实现rabbitmq消息确认的示例代码
概述
RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
一、运行效果
二、实现过程
①、引入rabbitmq包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
②、修改application.properties配置
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 发送者开启 confirm 确认机制 spring.rabbitmq.publisher-confirms=true # 发送者开启 return 确认机制 spring.rabbitmq.publisher-returns=true #################################################### # 设置消费端手动 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支持重试 spring.rabbitmq.listener.simple.retry.enabled=true
③、定义exchange和queue,并将queue绑定在exchange上
package com.mm.springbootrabbitmqconfirmdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean(name = "confirmQueue") public Queue confirmQueue(){ return new Queue("confirmQueue",true,false,false); } @Bean(name = "confirmExchange") public FanoutExchange confirmExchange(){ return new FanoutExchange("confirmExchange"); } @Bean public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange); } }
④、消息发送确认
发送消息确认:用来确认生产者 producer
将消息发送到 broker
,broker
上的交换机 exchange
再投递给队列 queue
的过程中,消息是否成功投递。
消息从 producer
到 rabbitmq broker
有一个 confirmCallback
确认模式。
消息从 exchange
到 queue
投递失败有一个 returnCallback
退回模式。
我们可以利用这两个Callback
来确保消息的100%送达。
1、 ConfirmCallback确认模式
消息只要被 rabbitmq broker
接收到就会触发 confirmCallback
回调 。
package com.mm.springbootrabbitmqconfirmdemo.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (!ack) { log.error("消息发送异常!"); } else { log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } }
实现接口 ConfirmCallback
,重写其confirm()
方法,方法内有三个参数correlationData
、ack
、cause
。
correlationData
:对象内部只有一个id
属性,用来表示当前消息的唯一性。ack
:消息投递到broker
的状态,true
表示成功。cause
:表示投递失败的原因。
但消息被 broker
接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue
里。所以接下来需要用到 returnCallback
。
2、 ReturnCallback 退回模式
如果消息未能投递到目标 queue
里将触发回调 returnCallback
,一旦向 queue
投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
com.mm.springbootrabbitmqconfirmdemo.service; lombok.extern.slf4j.; org.springframework.amqp.core.Message; org.springframework.amqp.rabbit.core.RabbitTemplate; org.springframework.stereotype.; ReturnCallbackService RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;
实现接口ReturnCallback
,重写 returnedMessage()
方法,方法有五个参数message
(消息体)、replyCode
(响应code)、replyText
(响应内容)、exchange
(交换机)、routingKey
(队列)。
下边是具体的消息发送,在rabbitTemplate
中设置 Confirm
和 Return
回调,我们通过setDeliveryMode()
对消息做持久化处理,为了后续测试创建一个 CorrelationData
对象,添加一个id
为10000000000
。
⑤、消息发送确认
消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack
)的过程。使用@RabbitHandler
注解标注的方法要增加 channel
(信道)、message
两个参数。
@Slf4j @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); //TODO 具体业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重复处理失败,拒绝再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
消费消息有三种回执方法,我们来分析一下每种方法的含义。
1、basicAck
basicAck
:表示成功确认,使用此回执方法后,消息会被rabbitmq broker
删除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag
都会增加。手动消息确认模式下,我们可以对指定deliveryTag
的消息进行ack
、nack
、reject
等操作。
multiple
:是否批量确认,值为 true
则会一次性 ack
所有小于当前消息 deliveryTag
的消息。
举个栗子: 假设我先发送三条消息deliveryTag
分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag
为8,multiple
设置为 true,会将5、6、7、8的消息全部进行确认。
2、basicNack
basicNack
:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag
:表示消息投递序号。
multiple
:是否批量确认。
requeue
:值为 true
消息将重新入队列。
3、basicReject
basicReject
:拒绝消息,与basicNack
区别在于不能进行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag
:表示消息投递序号。
requeue
:值为 true
消息将重新入队列。
三、项目结构图
四、补充
1、别忘确认消息
这是一个非常没技术含量的坑,但却是非常容易犯错的地方。
开启消息确认机制,消费消息别忘了channel.basicAck
,否则消息会一直存在,导致重复消费。
2、消息无限投递
在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0
发生异常后将消息重新投入队列。
@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("消费者 2 号收到:{}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }
3、重复消费
如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL
、或者redis
将消息持久化,通过再消息中的唯一性属性校验。
可以看到使用了 RabbitMQ
以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:
消息生产者 - > rabbitmq服务器(消息发送失败)
rabbitmq服务器自身故障导致消息丢失
消息消费者 - > rabbitmq服务(消费消息失败)
到此这篇关于springboot实现rabbitmq消息确认的示例代码的文章就介绍到这了,更多相关springboot rabbitmq消息确认内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
详解关于mybatis-plus中Service和Mapper的分析
这篇文章主要介绍了详解关于mybatis-plus中Service和Mapper的分析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-09-09
最新评论