分布式消息队列RocketMQ概念详解

 更新时间:2023年05月09日 14:19:17   作者:山河亦问安  
RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统,本文详细介绍了分布式消息队列RocketMQ概念,需要的朋友可以参考下

1.MQ概述

1.1 RocketMQ简介

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

1.2 MQ用途

限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

 异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

 数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

1.3 常见MQ产品

RabbitMQ
RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是Java语言开发,所以公司内部对其实现定制化开发难度较大。
Kafka
Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐量,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。
RocketMQ
RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其没有遵循任何常见的MQ协议,而是使用自研协议。

对比

2.RocketMQ 基本概念

2.1 消息

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。单个消息所占空间不会很大。

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。 

msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器 
offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量) 
key:由用户指定的业务相关的唯一标识

2.2 主题

Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。 

2.3 标签

标签为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 Topic是消息的一级分类,Tag是消息的二级分类。Topic相当于货物,Tag相当于上海山东等地区。

2.4 队列

存储消息的物理实体。 一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。

分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。

2.5 Producer

消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。  例如:用户提交的请求写入到MQ的过程,就是消息生产的过程,在这里用户就是生产者 。

RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列,生产者组只有一个生产者,生产者会采取轮询的方式进行发送消息。

生产者代码如下:

导入依赖

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

 生产者代码

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer order = new DefaultMQProducer("order");
        order.setNamesrvAddr("localhost:9876");
        order.start();
        Message message = new Message("myTopic", "myTag", ("test").getBytes());
        SendResult result = order.send(message);
        System.out.println(result);
        order.shutdown();
    }

2.6 Consumer

消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。  例如:系统从MQ中读取到请求,并对请求进行处理的过程就是消息消费的过程,在这里系统就是消费者。  

RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。 消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue)的目标变得非常容易。

消费者代码

  public static void main(String[] args) throws MQClientException {
 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("myTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("收到的消息"+list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
 
    }

负载均衡策略

queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数
queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue

消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

2.7 NameServer

NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。 
主要包括两个功能: 
Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。

路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。 

路由注册 

Name Server既然是注册中心,那么是如何完成注册的呢? NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着⼀个Broker列表,用来动态存储Broker的信息。  

Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。 

路由剔除 
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。 NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。 

路由发现 
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取Topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。

2.8 Broker

Broker充当着消息中转角色,负责存储消息、转发消息。
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。

模块如下图:

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。

Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护Consumer的Topic订阅信息

Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

Index Service:索引服务。根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。

2.9 RocketMQ 工作流程

工作流程如下图:

1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。

2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每50秒向NameServer定时发送心跳包。

3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。

4) Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。

5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。

以上就是分布式消息队列RocketMQ概念详解的详细内容,更多关于RocketMQ概念的资料请关注脚本之家其它相关文章!

相关文章

  • Java 中Json中既有对象又有数组的参数如何转化成对象(推荐)

    Java 中Json中既有对象又有数组的参数如何转化成对象(推荐)

    Gson库是一个功能强大、易于使用的Java序列化/反序列化库,它提供了丰富的API来支持Java对象和JSON之间的转换,这篇文章主要介绍了Java 中Json中既有对象又有数组的参数如何转化成对象,需要的朋友可以参考下
    2024-07-07
  • Java各种锁在工作中使用场景和细节经验总结

    Java各种锁在工作中使用场景和细节经验总结

    本章主要说一说锁在工作中的使用场景,主要以 synchronized 和 CountDownLatch 为例,会分别描述一下这两种锁的使用场景和姿势
    2022-03-03
  • Java集合之Map接口的实现类精解

    Java集合之Map接口的实现类精解

    Map提供了一种映射关系,其中的元素是以键值对(key-value)的形式存储的,能够实现根据key快速查找value;Map中的键值对以Entry类型的对象实例形式存在;键(key值)不可重复,value值可以重复,一个value值可以和很多key值形成对应关系,每个建最多只能映射到一个值
    2021-09-09
  • Java实现时间动态显示方法汇总

    Java实现时间动态显示方法汇总

    这篇文章主要介绍了Java实现时间动态显示方法汇总,很实用的功能,需要的朋友可以参考下
    2014-08-08
  • 一天时间用Java写了个飞机大战游戏,朋友直呼高手

    一天时间用Java写了个飞机大战游戏,朋友直呼高手

    前两天我发现论坛有两篇飞机大战的文章异常火爆,但都是python写的,竟然不是我大Java,说实话作为老java选手,我心里是有那么一些失落的,今天特地整理了这篇文章,需要的朋友可以参考下
    2021-05-05
  • springboot项目中mybatis-plus@Mapper注入失败问题

    springboot项目中mybatis-plus@Mapper注入失败问题

    这篇文章主要介绍了springboot项目中mybatis-plus@Mapper注入失败问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-07-07
  • 改变JAVA窗体属性的操作方法

    改变JAVA窗体属性的操作方法

    在本篇内容里小编给大家详细分析了关于改变JAVA窗体属性的操作方法和步骤,需要的朋友们学习下。
    2018-12-12
  • SpringBoot MainApplication类文件的位置详解

    SpringBoot MainApplication类文件的位置详解

    这篇文章主要介绍了SpringBoot MainApplication类文件的位置详解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • 剑指Offer之Java算法习题精讲数组与字符串题

    剑指Offer之Java算法习题精讲数组与字符串题

    跟着思路走,之后从简单题入手,反复去看,做过之后可能会忘记,之后再做一次,记不住就反复做,反复寻求思路和规律,慢慢积累就会发现质的变化
    2022-03-03
  • java如何多线程批量更新10万级的数据

    java如何多线程批量更新10万级的数据

    在处理大数据量的批量更新时,直接使用mybatis的updateBatch可能导致效率低下甚至OOM,通过每次处理5000条数据的方式虽然安全但效率低,更优的解决方案是使用多线程处理,将数据分批并多线程执行,有效提高了处理速度并保证了系统稳定性
    2024-10-10

最新评论