RocketMQ生产者一个应用不能发送多个NameServer消息解决

 更新时间:2022年11月29日 10:52:04   作者:梦想实现家_Z  
这篇文章主要为大家介绍了RocketMQ生产者一个应用不能发送多个NameServer消息原因及解决方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

目前有两套RocketMQ集群,集群A包含topic名称为cluster_A_topic,集群B包含topic名称为cluster_B_topic,在应用服务OrderApp上通过RocketMQ Client创建两个DefaultMQProducer实例发送消息给集群A和集群B

架构图如下:

根据上述架构图,我们给出的示例代码如下:

// 创建第一个DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
    // 设置nameServer地址
    producer1.setNamesrvAddr("192.168.2.230:9876");
    try {
      producer1.start();
      // 发送消息
      SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result1.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_A_topic 发送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_A_topic 持久化失败!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_A_topic 同步slave失败!");
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_A_topic 副本不可用!");
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    // 创建第二个DefaultMQProducer
    DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
    // 设置nameServer地址
    producer2.setNamesrvAddr("192.168.2.231:9876");
    try {
      producer2.start();
      // 发送消息
      SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
      switch (result2.getSendStatus()) {
        case SEND_OK:
          System.out.println("cluster_B_topic 发送成功!");
          break;
        case FLUSH_DISK_TIMEOUT:
          System.out.println("cluster_B_topic 持久化失败!");
          break;
        case FLUSH_SLAVE_TIMEOUT:
          System.out.println("cluster_B_topic 同步slave失败!");
          break;
        case SLAVE_NOT_AVAILABLE:
          System.out.println("cluster_B_topic 副本不可用!");
      }
      return "ok";
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer1.shutdown();
      producer2.shutdown();
    }

结果竟然报错了,报错内容时cluster_B_topic不存在:

经过不断的测试,发现只有放在最前面启动的DefaultMQProducer会生效,后面启动的DefaultMQProducer发送消息就报错说对应的topic不存在,而且报错的broker竟然是前面启动的DefaultMQProducer对应的broker。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?

问题定位

首先说明一下,当前使用的RocketMQ Client版本是4.8.0。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸😢]。

我们都知道生产者是发送消息给Broker的,获取Broker信息是通过连接NameServer获取的。既然报错的Broker和目标Broker竟然不对应,肯定是后面启动的生产者获取的Broker不对。有了最基本的判断,我们先从DefaultMQProducer#start()入手,最终我们定位到这样一段代码DefaultMQProducerImpl#start(final boolean startFactory)

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
// 如果生产者group名称不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
            // 创建MQClientInstance实例
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 注册生产者实例到MQClientInstance中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
            // 添加TBW102对应的topic信息,broker设置autoCreateTopicEnable = true才起作用
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                if (startFactory) {
                    // 启动刚刚创建的MQClientInstance实例
                    mQClientFactory.start();
                }
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 修改服务状态为RUNNING
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

上面的代码主要是创建了MQClientInstance实例,并且通过start()方法启动。

通过针对这两段代码的debug,我们发现创建的两个DefaultMQProducer对象是共用了一个MQClientInstance实例,并且所有针对NameServerBroker的远程操作全部是通过MQClientInstance实例来做的。比如发送消息的时候需要找到对应的Broker下的消息队列:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 从NameServer更新topic路由
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

最终我们发现两个DefaultMQProducer对象都是去同一个NameServer下获取对应的topic信息,这下问题就定位到了:因为使用了同一个MQClientInstance实例导致不同的DefaultMQProducer去访问了同一个NameServer,同一个集群需要同时接收两个topic的消息,也就出现了前面的报错说topic不存在的情况。

如何解决

我们来看看MQClientInstance实例是如何保证唯一性的:

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        // 生成clientID
        String clientId = clientConfig.buildMQClientId();
        // 从缓存中获取MQClientInstance
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            // 没有缓存的话就创建一个MQClientInstance
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            // 新创建出来的再放进缓存
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
        // 返回MQClientInstance实例
        return instance;
    }

我们之所以拿到的MQClientInstance实例是同一个,是因为在同一个服务下创建的clientId相同:

    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
        return sb.toString();
    }

两个clientId都是192.168.18.173@14933,为了防止clientId相同,我们可以在创建DefaultMQProducer实例是加上unitName值,保证两个unitName值不同来避免共享同一个MQClientInstance

DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();

通过上述代码修改后,两个消息都发送成功了。

另一个办法就是升级RocketMQ Client4.9.0,我们来看一下RocketMQ Client 4.9.0是怎么解决这个问题的:

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
        }
    }

RocketMQ Client 4.9.0在后面补充了一个纳秒值,之前的代码是这样的:

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
        }
    }

也就是说,在新的版本中,一个应用服务内创建多个DefaultMQProducer就会有多个MQClientInstance实例对应,不会再出现我们前面的报错。

以上就是RocketMQ生产者一个应用不能发送多个NameServer消息解决的详细内容,更多关于RocketMQ发送NameServer的资料请关注脚本之家其它相关文章!

相关文章

  • 浅谈JAVA 线程状态中可能存在的一些误区

    浅谈JAVA 线程状态中可能存在的一些误区

    这篇文章主要介绍了浅谈JAVA 线程状态中可能存在的一些误区,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-04-04
  • IDEA maven依赖错误中包下面红色波浪线

    IDEA maven依赖错误中包下面红色波浪线

    这篇文章主要介绍了IDEA maven依赖错误中包下面红色波浪线,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • ToStringBuilder类的一些心得

    ToStringBuilder类的一些心得

    ToStringBuilder类的一些心得,需要的朋友可以参考一下
    2013-02-02
  • java 如何从字符串里面提取时间

    java 如何从字符串里面提取时间

    这篇文章主要介绍了java实现从字符串里面提取时间的方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • java实现获取安卓设备里已安装的软件包

    java实现获取安卓设备里已安装的软件包

    本文给大家介绍的是如何获取设备中已经安装的应用软件包的代码,其核心方法原理很简单,我们通过Android中提供的PackageManager类,来获取手机中安装的应用程序信息
    2015-10-10
  • spring mvc DispatcherServlet之前端控制器架构详解

    spring mvc DispatcherServlet之前端控制器架构详解

    这篇文章主要为大家详细介绍了spring mvc DispatcherServlet之前端控制器架构,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-04-04
  • Java程序单实例运行的简单实现

    Java程序单实例运行的简单实现

    这篇文章主要介绍了Java程序单实例运行的简单实现方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • 深入理解JDK8中Stream使用

    深入理解JDK8中Stream使用

    Stream 是 Java8 中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作。这篇文章主要介绍了JDK8中Stream使用解析,需要的朋友可以参考下
    2021-06-06
  • Java Math类的三个方法ceil,floor,round用法

    Java Math类的三个方法ceil,floor,round用法

    这篇文章主要介绍了Java Math类的三个方法ceil,floor,round用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Mybatis order by 动态传参出现的问题及解决方法

    Mybatis order by 动态传参出现的问题及解决方法

    今天,我正在愉快地CRUD,突然发现出现一个Bug,我们来看看是怎么回事吧!接下来通过本文给大家介绍Mybatis order by 动态传参出现的一个小bug,需要的朋友可以参考下
    2021-07-07

最新评论