RabbitMQ 如何解决消息幂等性的问题

 更新时间:2021年07月05日 10:00:00   作者:王小白_Ada  
这篇文章主要介绍了RabbitMQ 如何解决消息幂等性的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

前言

关于MQ消费者的幂等性问题,在于MQ的重试机制,因为网络原因或客户端延迟消费导致重复消费。使用MQ重试机制需要注意的事项以及如何解决消费者幂等性问题以下将逐一讲解。

1. RabbitMQ自动重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理?

使用重试机制,RabbitMQ默认开启重试机制。

实现原理:

  • @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务
  • 如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,消息缓存在RabbitMQ服务器端

注意:

  • 默认会一直重试到消费者不抛异常为止,这样显然不好。我们需要修改重试机制策略,如间隔3s重试一次)

配置:

spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000

2. 如何合理选择重试机制?

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试,可能是因为网络原因短暂不能访问

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试,因为属于程序bug需要重新发布版本

总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job进行健康检查+人工进行补偿

3. 调用第三方接口自动实现补偿机制

我们知道了,RabbitMQ在消费者消费发生异常时,会自动进行补偿机制,所以我们(消费者)在调用第三方接口时,可以根据返回结果判断是否成功:

  • 成功:正常消费
  • 失败:手动抛处一个异常,这时RabbitMQ自动给我们做重试 (补偿)。

4. 如何解决消费者幂等性问题

防止重复消费 (MQ重试机制需要注意的问题)

产生原因:网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行MQ重试补偿,在重试过程中,可能会造成重复消费。

面试题:MQ中消费者如何保证幂等性问题,不被重复消费?

在这里插入图片描述

伪代码:

生产者核心代码:

请求头设置消息id(messageId)

@Component
public class FanoutProducer {
 @Autowired
 private AmqpTemplate amqpTemplate;

 public void send(String queueName) {
  String msg = "my_fanout_msg:" + System.currentTimeMillis();
  //请求头设置消息id(messageId)
  Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
    .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
  System.out.println(msg + ":" + msg);
  amqpTemplate.convertAndSend(queueName, message);
 }
}

消费者核心代码:

@RabbitListener(queues = "fanout_email_queue")
 public void process(Message message) throws Exception {
  // 获取消息Id
  String messageId = message.getMessageProperties().getMessageId();
  String msg = new String(message.getBody(), "UTF-8");
  //② 判断唯一Id是否被消费,消息消费成功后将id和状态保存在日志表中,我们从(①步骤)表中获取并判断messageId的状态即可
  //从redis中获取messageId的value
  String value = redisUtils.get(messageId)+"";
  if(value.equals("1") ){ //表示已经消费
   return; //结束
  }
  System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
  JSONObject jsonObject = JSONObject.parseObject(msg);
  // 获取email参数
  String email = jsonObject.getString("email");
  // 请求地址
  String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
  JSONObject result = HttpClientUtils.httpGet(emailUrl);
  if (result == null) {
   // 因为网络原因,造成无法访问,继续重试
   throw new Exception("调用接口失败!");
  }
  System.out.println("执行结束....");
  //① 执行到这里已经消费成功,我们可以修改messageId的状态,并存入日志表(可以存到redis中,key为消息Id、value为状态)
 }

5. SpringBoot整合RabbitMQ应答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual:

spring:
  rabbitmq:
    # 连接地址
    host: 127.0.0.1
    # 端口号
    port: 5672
    # 账号
    username: guest
    # 密码
    password: guest
    # 地址(类似于数据库的概念)
    virtual-host: /admin_vhost
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 3000
        # 开启手动ack
        acknowledge-mode: manual

2.消费者增加代码:

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手动ack
channel.basicAck(deliveryTag, false);手动签收
//邮件队列
@Component
public class FanoutEamilConsumer {
 @RabbitListener(queues = "fanout_email_queue")
 public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
  System.out
    .println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")
      + ",messageId:" + message.getMessageProperties().getMessageId());
  // 手动ack
  Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  // 手动签收
  channel.basicAck(deliveryTag, false);
 }
}

RabbitMQ 如何保证幂等性,数据一致性

mq的作用主要是用来解耦,削峰,异步,

增加MQ,系统的复杂性也会增加很多,

也会带来其他的问题,比如MQ挂了怎么办,怎么保持数据的幂等性

幂等性问题通俗点讲就是保证数据不被重复消费,同时数据也不能少,

也就是数据一致性问题。

下面是MQ丢失的3种情况

rabbitmq-message-lose

1,生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,

然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

2,MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

rabbitmq-message-lose-solution

数据重复的问题简单的多,就是在消费端判断数据是否已经被消费过

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Spring Cloud Zuul路由网关服务过滤实现代码

    Spring Cloud Zuul路由网关服务过滤实现代码

    这篇文章主要介绍了Spring Cloud Zuul路由网关服务过滤实现代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • Java中使用同步回调和异步回调的示例详解

    Java中使用同步回调和异步回调的示例详解

    这篇文章主要介绍了Java中使用同步回调和异步回调的相关资料,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-04-04
  • Java如何将Excel数据导入到数据库

    Java如何将Excel数据导入到数据库

    这篇文章主要为大家详细介绍了Java将Excel数据导入到数据库的方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-10-10
  • SpringBoot中分页插件PageHelper的使用详解

    SpringBoot中分页插件PageHelper的使用详解

    分页查询是为了高效展示大量数据,通过分页将数据划分为多个部分逐页展示,原生方法需手动计算数据起始行,而使用PageHelper插件则简化这一过程,本文给大家介绍SpringBoot中分页插件PageHelper的使用,感兴趣的朋友一起看看吧
    2024-09-09
  • java实现利用String类的简单方法读取xml文件中某个标签中的内容

    java实现利用String类的简单方法读取xml文件中某个标签中的内容

    下面小编就为大家带来一篇java实现利用String类的简单方法读取xml文件中某个标签中的内容。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-12-12
  • SpringBoot自定义Banner使用详解

    SpringBoot自定义Banner使用详解

    这篇文章主要介绍了SpringBoot自定义Banner使用详解,启动 Spring Boot 时,几乎总是能在控制台上方看到如下横幅,这个也叫字符画、英文ASCII艺术字,这就是banner,我们来看一下如何使用,需要的朋友可以参考下
    2024-01-01
  • hibernate一对多关联映射学习小结

    hibernate一对多关联映射学习小结

    这篇文章主要介绍了hibernate一对多关联映射学习小结,需要的朋友可以参考下
    2017-09-09
  • java实现支付宝支付接口的调用

    java实现支付宝支付接口的调用

    本文主要介绍了java实现支付宝支付接口的调用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • Java创建型设计模式之工厂方法模式深入详解

    Java创建型设计模式之工厂方法模式深入详解

    工厂方法模式(FACTORY METHOD)是一种常用的类创建型设计模式,此模式的核心精神是封装类中变化的部分,提取其中个性化善变的部分为独立类,通过依赖注入以达到解耦、复用和方便后期维护拓展的目的。它的核心结构有四个角色,分别是抽象工厂、具体工厂、抽象产品、具体产品
    2022-09-09
  • java异常处理拦截器详情

    java异常处理拦截器详情

    这篇文章主要介绍了java异常处理拦截器,使用异常处理拦截器,可以不用写那么多try…catch…,下面就来学习关于java异常处理拦截器的详情内容吧,需要的朋友可以参考一下
    2021-10-10

最新评论