RocketMQ中消费者的消费进度管理

 更新时间:2023年10月11日 10:31:00   作者:fFee-ops  
这篇文章主要介绍了RocketMQ中消费者的消费进度管理,业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才会认为这批消息(默认是1条)是消费完成的,需要的朋友可以参考下

RocketMQ消费进度管理

业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才会认为这批消息(默认是1条)是消费完成的

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消费失败的消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默 认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

从哪里开始消费

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度,如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个消费组在这条queue上的消费进度。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker(即,不是立刻同步到broker,有一段时间消费进度只会存在于本地,此时如果宕机,那么未提交的消费进度就会被重新消费),以此持久化消费进度。但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

在这里插入图片描述

比如2消费失败,rocketmq跳过2消费到了8,8消费成功了,但是提交的时候只会提交【消费到了1】,因为2失败了,所以会提交最小成功点

重复消费问题

由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这里插入图片描述

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。 在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

重复消费验证

查看当前消费进度

检查队列消费的当前进度。 查看RocketMQ 的config文件夹下的 consumerOffset.json

cat consumerOffset.json

在这里插入图片描述

通过consumerOffset.json我们可以知道当前 topicTest 主题的 rocket_test_consumer_group 组的 queue2 消费到偏移量为32

消费者发送消息

消费者发送消息,并查看各个队列消息的偏移量

在这里插入图片描述

我们发现队列2的偏移量最小为32 消费的时候最小偏移量不提交,其他都正常

//队列2的偏移量为32的数据在等待
if (ext.getQueueId() == 2 && ext.getQueueOffset() == 32) {
System.out.println("消息消费耗时较厂接收queueId:[" + ext.getQueueId() + "],偏移量
offset:[" + ext.getQueueOffset() + "]");
//等待 模拟假死状态
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

运行查看日志

在这里插入图片描述

我们发现只有队列2的偏移量为32的消息消费超时,其他都已经正常消费 我们再查看下consumerOffset.json

cat consumerOffset.json

在这里插入图片描述

我们发现因为rocketMQ 整个消费记录都没有被提交,所以下次消费会全部再次消费。 这里模拟出了整个消费进度都在本地,没来得及提交给broker。

还有一种情况就是,进度成功提交给broker了,queue0、1、3的消费进度都改变了。但是queue2的消费进度还是32,因为消费32的时候超时了,rocketmq只能提交最小成功offset!

再次消费

去掉延时代码继续消费

在这里插入图片描述

我们发现消息被重复消费了一遍

到此这篇关于RocketMQ中消费者的消费进度管理的文章就介绍到这了,更多相关RocketMQ消费进度管理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅谈FileItem类的常用方法

    浅谈FileItem类的常用方法

    下面小编就为大家带来一篇浅谈FileItem类的常用方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-08-08
  • Spring整合Quartz开发代码实例

    Spring整合Quartz开发代码实例

    这篇文章主要介绍了Spring整合Quartz开发代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • mybatis resultType自带数据类型别名解读

    mybatis resultType自带数据类型别名解读

    MyBatis为了简化开发,通过org.apache.ibatis.type.TypeAliasRegistry为常见类定义了别名,这些别名包括基本数据类型及其数组、集合类型等,如string对应java.lang.String,int对应java.lang.Integer等,此外,还有特殊前缀的别名如_int对应int类型
    2024-10-10
  • spring、mybatis 配置方式详解(常用两种方式)

    spring、mybatis 配置方式详解(常用两种方式)

    这篇文章给大家总结了常用的两种spring、mybatis 配置方式,本文给大家介绍的非常详细,需要的朋友参考下吧
    2017-12-12
  • Java中字符串根据宽度(像素)换行的问题

    Java中字符串根据宽度(像素)换行的问题

    这篇文章主要介绍了Java中字符串根据宽度(像素)换行的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • SpringBoot+Redis BitMap实现签到与统计的项目实践

    SpringBoot+Redis BitMap实现签到与统计的项目实践

    最近项目里需要集成签到和统计功能,连续签到后会给用户发放一些优惠券和奖品,以此来吸引用户持续在该品台进行活跃,本文就详细的介绍一下如何实现,感兴趣的可以了解一下
    2023-09-09
  • Java设计模式之适配器模式的示例详解

    Java设计模式之适配器模式的示例详解

    适配器模式,即将某个类的接口转换成客户端期望的另一个接口的表示,主要目的是实现兼容性,让原本因为接口不匹配,没办法一起工作的两个类,可以协同工作。本文将通过示例详细介绍适配器模式,需要的可以参考一下
    2022-08-08
  • 关于 Java 的数据结构链表

    关于 Java 的数据结构链表

    这篇文章主要介绍了关于 Java 的数据结构链表的相关资料,需要的朋友可以参考下面文章内容
    2021-09-09
  • 浅谈mac下maven的安装配置与使用

    浅谈mac下maven的安装配置与使用

    这篇文章主要介绍了浅谈mac下maven的安装配置与使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • 如何基于SpringMVC实现断点续传(HTTP)

    如何基于SpringMVC实现断点续传(HTTP)

    这篇文章主要介绍了如何基于SpringMVC实现断点续传(HTTP),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01

最新评论