RocketMQ NameServer保障数据一致性实现方法讲解
路由注册角度
对于ZooKeeper
这样的强一致性组件,使用主从分离的架构,数据只写到主节点,主从之间的数据同步通过内部机制来进行数据复制。
对于RocketMQ
来说,NameServer
节点之间是互相不进行通信的,这样也就无法进行数据复制。RocketMQ
采用的机制是:在Broker
节点启动的时候,轮询所有的NameServer
节点,并与每个NameServer
节点建立长连接,发送注册请求。
相应的,NameServer
节点内部也会维护一个Broker
列表,用来动态存储Broker
的信息,做服务发现。
与此同时,Broker
使用心跳机制来向所有NameServer
节点证明自己是存活的,即定期发送心跳包;收到心跳包之后,NameServer
节点会更新这个Broker
的最新存活时间。
注意: NameServer
节点在处理心跳包时,存在多个请求同时处理同一张表的情况,为了保证并发安全性,RocketMQ
引入了读写锁(ReadWriteLock
),保证了多个Producer
并发读取路由信息不受影响,但同一时刻只能处理一个Broker
发来的心跳包,这也符合读多写少的经典场景。
路由剔除
正常情况下:
如果Broker
下线,则会与NameServer
断开长连接,底层基于Netty
的通道关闭监听器会监听到连接断开事件,然后将这个Broker
信息剔除。
异常情况下:
NameServer
有一个周期为10s的定时任务,定期扫描Broker
表,如果超过120s没有收到某个Broker
的心跳包,则会判定其失效并移除。
对于日常运维的需求,RocketMQ
提供了优雅剔除路由信息的方式,即可以先禁止Broker
的写权限,这样发送到这个Broker
的请求都会收到一个NO_PERMISSION
的响应,客户端自动重试其他的Broker
。
路由发现
生产者视角:
一般是在发送第一条消息时,才会根据Topic
从NameServer
获取路由信息
消费者视角:
订阅的Topic
一般是固定的,所以在启动时就会拉取
针对路由信息可能变化的场景,RocketMQ
提供了定时拉取Topic
最新路由信息的机制,以应对Broker
集群发生变化的场景。
DefaultMQProducer
和DefaultMQConsumer
有一个pollNameServerInterval
的配置项,用于指定从NameServer
获取路由信息的周期,其底层依赖MQClientInstance
类,MQClientInstance
类中的updateTopicRouteInfoFromNameServer
方法,可以根据指定的时间间隔,周期性地从NameServer
里拉取路由信息。在拉取时,会将当前启动的Producer
和Consumer
需要用到的Topic
列表放到一个集合里,逐个进行更新,源码如下:
/** * 更新单个Topic路由信息 */ public boolean updateTopicRouteInfoFromNameServer(final String topic) { return updateTopicRouteInfoFromNameServer(topic, false, null); }
/** * 更新单个Topic路由信息 */ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { // 使用默认TopicKey获取TopicRouteData topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } if (changed) { // 克隆出一个实例cloneTopicRouteData : topicRouteData会被设置到下面的publishInfo/subscribeInfo TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); // 更新Broker地址相关信息,当某个Broker心跳超时后,会被从brokerAddrTable中移除 for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId); } } catch (MQClientException e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } catch (RemotingException e) { log.error("updateTopicRouteInfoFromNameServer Exception", e); throw new IllegalStateException(e); } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; }
当Broker
宕机时,还可以通过客户端的重试机制来解决,避免因为定时更新路由信息不及时导致的服务宕机~~
到此这篇关于RocketMQ NameServer保障数据一致性实现方法讲解的文章就介绍到这了,更多相关RocketMQ NameServer内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
SpringBoot(cloud)自动装配bean找不到类型的问题
这篇文章主要介绍了SpringBoot(cloud)自动装配bean找不到类型的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-02-02springboot整合swagger3报Unable to infer base&nbs
这篇文章主要介绍了springboot整合swagger3报Unable to infer base url错误问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教2024-05-05
最新评论