java rocketmq--消息的产生(普通消息)

 更新时间:2019年06月20日 17:04:44   作者:有爱jj  
这篇文章主要介绍了java rocketmq--消息的产生(普通消息),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下

前言

与消息发送紧密相关的几行代码:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那这几行代码执行时,背后都做了什么?

一. 首先是DefaultMQProducer.start

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}

调用了默认生成消息的实现类 -- DefaultMQProducerImpl

调用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()会初始化得到MQClientInstance实例对象,MQClientInstance实例对象调用它自己的start方法会 ,启动一些服务,如拉去消息服务PullMessageService.Start()、启动负载平衡服务RebalanceService.Start(),比如网络通信服务MQClientAPIImpl.Start()

另外,还会执行与生产消息相关的信息,如注册produceGroup、new一个TopicPublishInfo对象并以默认TopicKey为键值,构成键值对存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,获取的MQClientInstance实例对象会调用sendHeartbeatToAllBroker()方法,不断向broker发送心跳包,yin'b可以使用下面一幅图大致描述DefaultMQProducerImpl.start()过程:

上图中的三个部分中涉及的内容:

1.1 初始化MQClientInstance

一个客户端只能产生一个MQClientInstance实例对象,产生方式使用了工厂模式与单例模式。MQClientInstance.start()方法启动一些服务,源码如下:

public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

1.2 注册producer

该过程会将这个当前producer对象注册到MQClientInstance实例对象的的producerTable中。一个jvm(一个客户端)中一个producerGroup只能有一个实例,MQClientInstance操作producerTable大概有如下几个方法:

  • -- selectProducer
  • -- updateTopicRouteInfoFromNameServer
  • -- prepareHeartbeatData
  • -- isNeedUpdateTopicRouteInfo
  • -- shutdown

注:

根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定义:

public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

它是一个以topic为key的Map型数据结构,DefaultMQProducerImpl.start()时会默认创建一个key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 发送心跳包

MQClientInstance向broker发送心跳包时,调用sendHeartbeatToAllBroker( ),以及从MQClientInstance实例对象的brokerAddrTable中拿到所有broker地址,向这些broker发送心跳包。

sendHeartbeatToAllBroker会涉及到prepareHeartbeatData()方法,该方法会生成heartbeatData数据,发送心跳包时,heartbeatData作为心跳包的body。与producer相关的部分代码如下:

// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}

二、. SendResult sendResult = producer.send(msg)

首先会调用DefaultMQProducer.send(msg) ,继而调用sendDefaultImpl:

public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

sendDefaultImpl做了啥?

2.1. 获取topicPublishInfo

根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo,如果没有则更新路由信息,从nameserver端拉取最新路由信息。从nameserver端拉取最新路由信息大致为:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 选择消息发送的队列

普通消息:默认方式下,selectOneMessageQueue从topicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息,默认采用长轮询的方式选择队列 。

它的机制如下:正常情况下,顺序选择queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的broker。不同的队列选择策略形成了生产消息的几种模式,如顺序消息,事务消息。

顺序消息:将一组需要有序消费的消息发往同一个broker的同一个队列上即可实现顺序消息,假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据参数arg字段来选择queue。

private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事务消息:只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交,消息发送失败,直接发送回滚消息,进行回滚,具体如何实现后面会单独成文分析。

2.3 封装消息体通信包,发送数据包

首先,根据获取的MessageQueue中的getBrokerName,调用findBrokerAddressInPublish得到该消息存放对应的broker地址,如果没有找到则跟新路由信息,重新获取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知获取的broker均为master(id=0)

然后, 将与该消息相关信息打包成RemotingCommand数据包,其RequestCode.SEND_MESSAGE

根据获取的broke地址,将数据包到对应的broker,默认是发送超时时间为3s。

封装消息请求包的包头:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);

发送消息包(普通消息默认为同步方式):

SendResult sendResult = null;
switch (communicationMode) {
   case SYNC:
  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  brokerAddr,
  mq.getBrokerName(),
   msg,
  requestHeader,
   timeout,
  communicationMode,
  context,
  this);
break;

处理来自broker端的响应数据包:

private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}

broker端处理request数据包后会将消息存储到commitLog,具体过程后续分析。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • Java C++算法题解leetcode801使序列递增的最小交换次数

    Java C++算法题解leetcode801使序列递增的最小交换次数

    这篇文章主要为大家介绍了Java C++题解leetcode801使序列递增的最小交换次数示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • Java Zookeeper分布式分片算法超详细讲解流程

    Java Zookeeper分布式分片算法超详细讲解流程

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等
    2023-03-03
  • kafka生产者和消费者的javaAPI的示例代码

    kafka生产者和消费者的javaAPI的示例代码

    这篇文章主要介绍了kafka生产者和消费者的javaAPI的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-06-06
  • 详解Java中用于国际化的locale类

    详解Java中用于国际化的locale类

    Java中也有用于转换和划分地区的国际化类java.lang.Locale,国际化在程序中设置语言和时间等时非常有用,下面我们就来详解Java中用于国际化的locale类
    2016-06-06
  • SpringBoot @Schedule的使用注意与原理分析

    SpringBoot @Schedule的使用注意与原理分析

    这篇文章主要介绍了SpringBoot @Schedule的使用注意与原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • Java动态规划之编辑距离问题示例代码

    Java动态规划之编辑距离问题示例代码

    这篇文章主要介绍了Java动态规划之编辑距离问题示例代码,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • ResultSet如何动态获取列名和值

    ResultSet如何动态获取列名和值

    这篇文章主要介绍了ResultSet如何动态获取列名和值问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • SpringCloud搭建Eureka服务模块的过程

    SpringCloud搭建Eureka服务模块的过程

    Eureka在分布式系统中起到了连接各个微服务的纽带作用,使得服务之间的交互变得更加灵活、可靠,本文将深入探讨如何使用Spring Cloud,逐步引导读者完成Eureka服务模块的搭建,感兴趣的朋友跟随小编一起看看吧
    2024-02-02
  • mybatis plus MetaObjectHandler 不生效的解决

    mybatis plus MetaObjectHandler 不生效的解决

    今天使用mybatis-plus自动为更新和插入操作插入更新时间和插入时间,配置了MetaObjectHandler不生效,本文就来解决一下,具有一定的 参考价值,感兴趣的可以了解一下
    2023-10-10
  • Spring实战之Qualifier注解用法示例

    Spring实战之Qualifier注解用法示例

    这篇文章主要介绍了Spring实战之Qualifier注解用法,结合实例形式详细分析了spring Qualifier注解相关配置、定义与使用方法,需要的朋友可以参考下
    2019-12-12

最新评论