RocketMQ中的消费模式和消费策略详解

 更新时间:2023年10月11日 10:05:18   作者:fedorafrog  
这篇文章主要介绍了RocketMQ中的消费模式和消费策略详解,RocketMQ 是基于发布订阅模型的消息中间件,所谓的发布订阅就是说,consumer 订阅了 broker 上的某个 topic,当 producer 发布消息到 broker 上的该 topic 时,consumer 就能收到该条消息,需要的朋友可以参考下

前言

首先明确一点,RocketMQ 是基于发布订阅模型的消息中间件。所谓的发布订阅就是说,consumer 订阅了 broker 上的某个 topic,当 producer 发布消息到 broker 上的该 topic 时,consumer 就能收到该条消息。

之前我们讲过 consumer group 的概念,即消费同一类消息的多个 consumer 实例组成一个消费者组,也可以称为一个 consumer 集群,这些 consumer 实例使用同一个 group name。需要注意一点,除了使用同一个 group name,订阅的 tag 也必须是一样的,只有符合这两个条件的 consumer 实例才能组成 consumer 集群。

1. 消费模式

1.1 集群消费

当 consumer 使用集群消费时,每条消息只会被 consumer 集群内的任意一个 consumer 实例消费一次。举个例子,当一个 consumer 集群内有 3 个consumer 实例(假设为consumer 1、consumer 2、consumer 3)时,一条消息投递过来,只会被consumer 1、consumer 2、consumer 3中的一个消费。

同时记住一点,使用集群消费的时候,consumer 的消费进度是存储在 broker 上,consumer 自身是不存储消费进度的。消息进度存储在 broker 上的好处在于,当你 consumer 集群是扩大或者缩小时,由于消费进度统一在broker上,消息重复的概率会被大大降低了。

注意:在集群消费模式下,并不能保证每一次消息失败重投都投递到同一个 consumer 实例。

1.2 广播消费

当 consumer 使用广播消费时,每条消息都会被 consumer 集群内所有的 consumer 实例消费一次,也就是说每条消息至少被每一个 consumer 实例消费一次。举个例子,当一个 consumer 集群内有 3 个 consumer 实例(假设为 consumer 1、consumer 2、consumer 3)时,一条消息投递过来,会被 consumer 1、consumer 2、consumer 3都消费一次。

与集群消费不同的是,consumer 的消费进度是存储在各个 consumer 实例上,这就容易造成消息重复。还有很重要的一点,对于广播消费来说,是不会进行消费失败重投的,所以在 consumer 端消费逻辑处理时,需要额外关注消费失败的情况。

虽然广播消费能保证集群内每个 consumer 实例都能消费消息,但是消费进度的维护、不具备消息重投的机制大大影响了实际的使用。因此,在实际使用中,更推荐使用集群消费,因为集群消费不仅拥有消费进度存储的可靠性,还具有消息重投的机制。而且,我们通过集群消费也可以达到广播消费的效果。

1.3 使用集群消费模拟广播消费

如果业务上确实需要使用广播消费,那么我们可以通过创建多个 consumer 实例,每个 consumer 实例属于不同的 consumer group,但是它们都订阅同一个 topic。举个例子,我们创建 3 个 consumer 实例,consumer 1(属于consumer group 1)、consumer 2(属于 consumer group 2)、consumer 3(属于consumer group 3),它们都订阅了 topic A ,那么当 producer 发送一条消息到 topic A 上时,由于 3 个consumer 属于不同的 consumer group,所以 3 个consumer都能收到消息,也就达到了广播消费的效果了。 除此之外,每个 consumer 实例的消费逻辑可以一样也可以不一样,每个consumer group还可以根据需要增加 consumer 实例,比起广播消费来说更加灵活。

2. 消费策略

Consumer在拉取消息之前需要对TopicMessage进行负载操作,负载操作由一个定时器来完成单位,定时间隔默认20s

简单来说就是将Topic下的MessageQueue分配给这些Consumer,至于怎么分,就是通过这些负载策略定义的算法规则来划分。

2.1 AllocateMessageQueueAveragely

平均负载策略,RocketMQ默认使用的就是这种方式,如果某个Consumer集群,订阅了某个Topic,Topic下面的这些MessageQueue会被平均分配给集群中的Consumer,为了帮助大家理解,我画了个图

我来讲下这个图代表的意思,假设topic为 testMsg,该testMsg下面有4个MessageQueue,然后这些Consumer组成了一个集群(都监听了该Topic并且消费groupId是一样的),首先会给Consumer和MessageQueue进行排序,谁是老大,谁先拿MessageQueue, 平均分配分为两种情况

2.1.1 MessageQueue数量大于Consumer数量

如果队列数量不是消费者数量的整数倍,跟上图中2个Consumer和4个Consumer的情况一样,先分每个Consumer应得的数量,拿2个Consumer来举个例子,C1 和C2 各自分到了2个MessageQueue,C1排序时在C2前面,所以C1先把 Q0 和Q1拿走,C2再拿2个,也就是Q2和Q3。

如果队列数量不是消费者的整数倍,跟上图3个Consumer和5个Consumer的情况一样,5个Comsumer的比较特殊,我们过会再讲,我们拿3个Consumer的情况来举例,4个消息队列,每个Consumer能分到1个,还剩下1个,弱肉强食嘛,剩下的当然给排在前面的大哥C1啦,最后分下来,C1分到2个队列,C2和C3只分到一个,分完数量以后,也是按照顺序来拿,C1拿到了Q0和Q1,然后C2就只能从Q2开始拿,C3只能拿剩下的Q3啦

2.1.2 MessageQueue数量小于Consumer数量

这种情况平均下来,每个人1个Consumer都分不到一个,也就是我们上图中的5个Comsumer的情况老规矩,按照排序顺序,每人先拿1个队列,由于C5排在最后面,队列全被别人拿走了,C5就一直分不到消息队列,除非前面的某个Consumer挂了,20s之后,在队列重新负载的时候就能拿到MessageQueue。

具体算法:

//consumer的排序后的
int index = cidAll.indexOf(currentCID);
//取模
int mod = mqAll.size() % cidAll.size();
//如果队列数小于消费者数量,则将分到队列数设置为1,如果余数大于当前消费者的index,则
//能分到的队列数+1,否则就是平均值
int averageSize =
  mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                                       + 1 : mqAll.size() / cidAll.size());
//consumer获取第一个MessageQueue的索引
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 如果消费者大于队列数,rang会是负数,循环也就不会执行 
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
  result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

2.2 AllocateMessageQueueAveragelyByCircle

环形平均分配,这个和平均分配唯一的区别就是,再分队列的时候,平均队列是将属于自己的MessageQueue全部拿走,而环形平均则是,一人拿一个,拿到的Queue不是连续的。我也画了张图来帮助大家理解

这种环形平均分配和平均分配,每个Consumer拿到的MessageQueue数量是不变的,我们就拿3个Consumer的情况举个例子,

也是对Consumer和MessageQueue排序,先确定每个Consumer能拿到的MessageQueue数量,C1能分到2个,C2和C3只能分到1个

C1先拿1个,然后C2拿一个,C3拿一个,C1再拿一个。也就是图上3个Consumer画的这个情况。

另外,如果Consumer的数量大于消息队列的数量,处理方式和平均分配时一样的。

//当前consumer排序后的索引 
int index = cidAll.indexOf(currentCID);  
//index会是consumer第一个拿到的消息队列索引
for (int i = index; i < mqAll.size(); i++) {
  //这里采用了取模的方式
  if (i % cidAll.size() == index) { 
    result.add(mqAll.get(i));  
  }
}

2.3 AllocateMessageQueueByConfig

用户自定义配置,用户在创建Consumer的时候,可以设置要使用的负载策略,如果我们设置为AllocateMessageQueueByConfig方式时,我们可以自己指定需要监听的MessageQueues,它维护了一个List messageQueueList,我们可以往这里面塞目标的MessageQueues,这个策略了解一下就行,用的不多,具体设置代码如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic
consumer.subscribe("testMsg","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
//用户自定义queue策略
AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
//指定MessageQueue
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("testMsg","broker-a",0)));
//设置consumer的负载策略
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
//启动consumer
consumer.start();

2.4 AllocateMessageQueueByMachineRoom

? 机房负载策略,其实这个策略就是当前Consumer只负载处在指定的机房内的MessageQueue,还有brokerName的命名必须要按要求的格式来设置:  机房名@brokerName

我们先看下具体的使用

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic
consumer.subscribe("testMsg","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
AllocateMessageQueueByMachineRoom allocateMachineRoom = new AllocateMessageQueueByMachineRoom();
//指定机房名称  machine_room1、machine_room2
allocateMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("machine_room1","machine_room2")));
//设置consumer的负载策略
consumer.setAllocateMessageQueueStrategy(allocateMachineRoom);
//启动consumer
consumer.start();

总结一下源码的意思:

  • 首先赛选出当前Topic处在指定机房的队列
  • 赛选出队列后,按照平均负载策略进行具体的分配(算法极其相似)

哈哈,发现总结的真简洁呀,别慌,来张图给你润润喉

其实这个策略就是对MessageQueue进行了过滤,过滤完了以后,后续操作就按照平均负载策略来进行具体负载操作。这个算法和平均负载的算法得到的结果是一样的,我感觉这两个策略应该是两个人写的,不然不会写两套不同的算法来实现一个功能。也不知道是不是我自己太差了,理解不到大佬的思路。

2.5 AllocateMachineRoomNearby

这个策略我个人感觉是AllocateMessageQueueByMachineRoom的改进版本,因为这个策略的处理方式要比AllocateMessageQueueByMachineRoom更加灵活,还考虑到了那些同机房只有MessageQueue却没有Consumer的情况,下面我们来具体讲这个策略。使用该策略需要自己定义一个类,来区分每个broker处于哪个机房,该策略RocketMQ有个测试单元,我稍微改造了一下,就是把这个类提出来了。

public class MyMachineResolver implements AllocateMachineRoomNearby.MachineRoomResolver {
    /**
     * 判断当前broker处于哪个机房
     * @param messageQueue
     * @return
     */
    @Override
    public String brokerDeployIn(MessageQueue messageQueue) {
        return messageQueue.getBrokerName().split("-")[0];
    }
    /**
     * 判断consumer处于哪个机房
     * @param clientID
     * @return
     */
    @Override
    public String consumerDeployIn(String clientID) {
        return clientID.split("-")[0];
    }
}

我们从代码中,可以看出来需要在设置brokerName和Consumer的Id的时候需要加上机房名称,eg:hz_aliyun_room1-broker-a、hz_aliyun_root1-Client1。我们先看一下在代码里面怎么使用这个同机房分配策略

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic
consumer.subscribe("testMsg","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
//用户同机房分配策略  
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely()
                ,new MyMachineResolver()));
//启动consumer
consumer.start();

我们可以看到,我们创建这个同机房分配策略的时候,还加了一个平均分配的策略进去,它本身就是一个策略,为啥还要传另一个策略。(该策略只会讲MessageQueue和Consumer按机房进行分组,分组以后具体的负载,就是通过我们传的另外一个负载策略来分配的)我们到源码里面去看,后面会解释到

//消息队列按机房分组
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {
  //这里调用我们自己定义的类方法,得到broker的机房的名称
  String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
  //机房不为空,将broker放到分组中
  if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    if (mr2Mq.get(brokerMachineRoom) == null) {
      mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    }
    mr2Mq.get(brokerMachineRoom).add(mq);
  } else {
    throw new IllegalArgumentException("Machine room is null for mq " + mq);
  }
}
//consumer按机房分组
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {
  //这里调用我们自己定义的类方法,得到broker的机房的名称
  String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
  if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    if (mr2c.get(consumerMachineRoom) == null) {
      mr2c.put(consumerMachineRoom, new ArrayList<String>());
    }
    mr2c.get(consumerMachineRoom).add(cid);
  } else {
    throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
  }
}
//当前consumer分到的所有MessageQueue
List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
//1.给当前consumer分当前机房的那些MessageQeueue
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
//得到当前机房的MessageQueue
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
//得到当前机房的Consumer
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
  //得到当前机房所有MessageQueue和Consumers后根据指定的策略再负载
  allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}
//2.如果该MessageQueue的机房 没有同机房的consumer,将这些MessageQueue按配置好的备用策略分配给所有的consumer
for (String machineRoom : mr2Mq.keySet()) {
  if (!mr2c.containsKey(machineRoom)) { 
    //添加分配到的游离态MessageQueue
    allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
  }
}

总结一下源码的意思

  1. 分别给MessageQueue和Consumer按机房分组
  2. 得到当前Consumer所在机房的所有Consumer和MessageQueue
  3. 通过设置的负载策略,再进行具体的负载,得到当前Consumer分到的MessageQueue
  4. 如果存在MessageQueue的某个机房中,没有和MessageQueue同机房的Consumer,将这些MessageQueue按配置的负载策略分配给集群中所有的Consumer去负载
  5. 最终该Consumer分到的MessageQueue会包含同机房分配到的和部分游离态分配的

这里我也画个图来解释一下,

先同机房的Consumer和MessageQueue进行负载,这里按照平均负载来分(我们创建机房就近策略使用的是平均负载),然后将游离态的通过设置的负载策略来分。

2.6 AllocateMessageQueueConsistentHash

一致性哈希策略,这里我简单介绍一下一致性哈希,这里不好我先给个图,我们再来解释

一致性哈希有一个哈希环的概念,哈希环由数值 0到2^32-1 组成,不管内容多长的字符,经过哈希计算都能得到一个等长的数字,最后都会落在哈希环上的某个点,哈希环上的点都是虚拟的,比如我们这里使用Consumer的Id来进行哈希计算,得到的这几个是物理的点,然后把得到的点存到TreeMap里面,然后将所有的MessageQueue依次进行同样的哈希计算,得到距离MessageQueue顺时针方向最近的那个Consumer点,这个就是MessageQeueu最终归属的那个Consumer。

我们看下源码:

//将所有consumer变成节点 到时候经过hash计算 分布在hash环上
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
  cidNodes.add(new ClientNode(cid));
}
final ConsistentHashRouter<ClientNode> router; 
//构建哈希环
if (customHashFunction != null) {
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
  //默认使用MD5进行Hash计算
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
  //对messageQueue进行hash计算,找到顺时针最近的consumer节点
  ClientNode clientNode = router.routeNode(mq.toString());
  //判断是否是当前consumer
  if (clientNode != null && currentCID.equals(clientNode.getKey())) {
    results.add(mq);
  }
}

到此这篇关于RocketMQ中的消费模式和消费策略详解的文章就介绍到这了,更多相关RocketMQ消费模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • spring cloud 使用Zuul 实现API网关服务问题

    spring cloud 使用Zuul 实现API网关服务问题

    这篇文章主要介绍了spring cloud 使用Zuul 实现API网关服务问题,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-05-05
  • Java中的常用时间日期类总结(Date、DateFormat)

    Java中的常用时间日期类总结(Date、DateFormat)

    在Java开发中处理时间和日期是相当常见的任务,无论是计算日期差异、格式化日期显示、解析日期字符串还是进行日期计算,都需要一些时间和日期处理的技巧,这篇文章主要给大家介绍了关于Java中常用时间日期类(Date、DateFormat)的相关资料,需要的朋友可以参考下
    2024-08-08
  • java 输入一个数字,反转输出这个数字的值(实现方法)

    java 输入一个数字,反转输出这个数字的值(实现方法)

    下面小编就为大家带来一篇java 输入一个数字,反转输出这个数字的值(实现方法)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-10-10
  • Java中内部类使用方法实战案例分析

    Java中内部类使用方法实战案例分析

    这篇文章主要介绍了Java中内部类使用方法,结合具体案例形式分析了Java内部类原理、调用方法及相关使用注意事项,需要的朋友可以参考下
    2019-09-09
  • Spring实现类私有方法的几个问题(亲测通用解决方案)

    Spring实现类私有方法的几个问题(亲测通用解决方案)

    现实的业务场景中,可能需要对Spring的实现类的私有方法进行测试。本文给大家分享Spring实现类私有方法面临的几个问题及解决方案,感兴趣的朋友跟随小编一起看看吧
    2021-06-06
  • Springboot 读取 yml 配置文件里的参数值

    Springboot 读取 yml 配置文件里的参数值

    本文主要介绍了Springboot 读取 yml 配置文件里的参数值,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-12-12
  • Java创建与结束线程代码示例

    Java创建与结束线程代码示例

    这篇文章主要介绍了Java创建与结束线程代码示例,小编觉得挺不错的,这里分享给大家,供需要的朋友参考。
    2017-10-10
  • SpringBoot整合Guava Cache实现全局缓存的示例代码

    SpringBoot整合Guava Cache实现全局缓存的示例代码

    这篇文章主要介绍了SpringBoot整合Guava Cache实现全局缓存,Guava Cache是Google Guava库中的一个模块,提供了基于内存的本地缓存实现,文中介绍了SpringBoot整合使用Guava Cache的具体步骤,需要的朋友可以参考下
    2024-03-03
  • java如何完成输出语句实例详解

    java如何完成输出语句实例详解

    输入输出可以说是计算机的基本功能,下面这篇文章主要给大家介绍了关于java如何完成输出语句的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-01-01
  • Spring Boot Maven Plugin打包异常解决方案

    Spring Boot Maven Plugin打包异常解决方案

    这篇文章主要介绍了Spring Boot Maven Plugin打包异常解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11

最新评论