java开发RocketMQ生产者高可用示例详解

 更新时间:2022年08月05日 16:10:52   作者:奔跑的毛球  
这篇文章主要为大家介绍了java开发RocketMQ生产者高可用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

前边两章说了点基础的,从这章开始,我们挖挖源码。看看RocketMQ是怎么工作的。

首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了。

我们看看消息孩子们都长啥样。

1 消息

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题名字
    private String topic;
    //消息扩展信息,Tag,keys,延迟级别都存在这里
    private Map<String, String> properties;
    //消息体,字节数组
    private byte[] body;
    //设置消息的key,
    public void setKeys(String keys) {}
    //设置topic
    public void setTopic(String topic) {}
    //延迟级别
    public int setDelayTimeLevel(int level) {}
    //消息过滤的标记
    public void setTags(String tags) {}
    //扩展信息存放在此
    public void putUserProperty(final String name, final String value) {}
}

消息就是孩子们,这些孩子们呢,有各自的特点,也有共性。同一个家长送来的两个孩子可以是去同一个地方的,也可以是去不同的地方的。

1.1 topic

首先呢,每个孩子消息都有一个属性topic,这个我们上文说到了,是一个候船大厅。孩子们进来之后,走到自己指定的候船大厅的指定区域(平时出门坐火车高铁不也是指定的站台乘车么),坐到message queue座位上等,等着出行。

Broker有一个或者多个topic,消息会存放到topic内的message queue内,等待被消费。

1.2 Body

孩子消息,也有一个Body属性,这就是他的能力,他会画画,他会唱歌,他会干啥干啥,就记录在这个Body属性里。等走出去了,体现价值的地方也是这个Body属性。

Body就是消息体,消费者会根据消息体执行对应的操作。

1.3 tag

这个tag我们上节说了,就是一个标记,有的孩子背着画板,相机,有的游船就特意找到这些孩子拉走,完成他们的任务。

可以给消息设置tag属性,消费者可以选择含有特定tag属性的消息进行消费。

1.4 key

key就是每个孩子消息的名字了。要找哪个孩子,喊他名就行。

对发送的消息设置好 Key,以后可以根据这个Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。

1.5 延迟级别

当然,还有的孩子来就不急着走,来之前就想好了,要恰个饭,得30分钟,所以自己来了会等30分钟后被接走。

设置延迟级别可以规定多久后消息可以被消费。

2 生产者高可用

每个送孩子来的家长都希望能送到候船大厅里,更不希望孩子被搞丢了,这个时候这个候船大厅就需要一些保证机制了。

2.1 客户端保证生产者高可用

2.1.1 重试机制

就是说家长送来了,孩子进到候船大厅之后,没能成功坐到message queue座位上,这个时候工作人员会安排重试,再去看是否有座位坐。重试次数默认是2次,也就是说,消息孩子共有3次找座位坐的机会。

看源码,我特意加了注解,大致可以看懂一些了。

//这里取到了重试的次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    //获取消息队列
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        mq = mqSelected;
        brokersSent[times] = mq.getBrokerName();
        try {
            beginTimestampPrev = System.currentTimeMillis();
            if (times > 0) {
                //Reset topic with namespace during resend.
                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
            }
            long costTime = beginTimestampPrev - beginTimestampFirst;
            if (timeout < costTime) {
                callTimeout = true;
                break;
            }
            //发送消息
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            ...
        } catch (RemotingException e) {
            ...
            continue;
        } catch (MQClientException e) {
            ...
            continue;
        } catch (MQBrokerException e) {
            ...
            continue;
        } catch (InterruptedException e) {
            //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
            throw e;
        }
    } else {
        break;
    }
}

重试代码如上,这个sendDefaultImpl方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。

2.1.2 客户端容错

若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤比较容易进入的来进入。当然那些已经关闭的停电的没有服务能力的,我们是不会进的。

MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.

选择Broker就是在选择message queue,对应的代码如下:

这里会先判断延迟容错开关是否开启,这个开关默认是关闭的,若是开启的话,会优先选择延迟较低的Broker。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //判断发送延迟容错开关是否开启
    if (this.sendLatencyFaultEnable) {
        try {
            //选择一个延迟上可以接受,并且和上次发送相同的Broker
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //若是Broker的延迟时间可以接受,则返回这个Broker
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        //若是前边都没选中一个Broker,就随机选一个Broker
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

但是当延迟容错开关关闭状态的时候,执行的代码如下:

为了均匀分散Broker的压力,会选择与之前不同的Broker

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //若是没有上次的Brokername做参考,就随机选一个
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //如果有,那么就选一个其他的Broker
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //这里判断遇上一个使用的Broker不是同一个
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //若是上边的都没选中,那么就随机选一个
        return selectOneMessageQueue();
    }
}

2.2 Broker端保证生产者高可用

Broker候船大厅为了能确切的接收到消息孩子,至少会有两个厅,一个主厅一个副厅,一般来说孩子都会进入到主厅,然后一顿操作,卡该忙信那机资(影分身之术),然后让分身进入到副厅,这样当主厅停电了,不工作了,副厅的分身只要去完成了任务就ok的。一般来说都是主厅的消息孩子去坐船完成任务。

之后我们会聊到Broker的主从复制,分为同步复制和异步复制,同步复制时指当master 收到消息之后,同步到slaver才算消息发送成功。异步复制是只要master收到消息就算成功。生产中建议至少部署两台master和两台slaver。

下一篇,我们聊聊,消息的发送流程,就是说,一个消息孩子,从进码头的门到坐到message queue座位上,都经历了啥。

以上就是java开发RocketMQ生产者高可用示例详解的详细内容,更多关于java RocketMQ生产者高可用的资料请关注脚本之家其它相关文章!

相关文章

  • Java中的synchronized有几种加锁方式(实例详解)

    Java中的synchronized有几种加锁方式(实例详解)

    在Java中,synchronized关键字提供了内置的支持来实现同步访问共享资源,以避免并发问题,这篇文章主要介绍了java的synchronized有几种加锁方式,需要的朋友可以参考下
    2024-05-05
  • Java详细分析Lambda表达式与Stream流的使用方法

    Java详细分析Lambda表达式与Stream流的使用方法

    Lambda表达式,基于Lambda所带来的函数式编程,又引入了一个全新的Stream概念,用于解决集合类库既有的弊端,Lambda 允许把函数作为一个方法的参数(函数作为参数传递进方法中)。使用 Lambda 表达式可以使代码变的更加简洁紧凑
    2022-04-04
  • SpringBoot结合mockito测试实战

    SpringBoot结合mockito测试实战

    与集成测试将系统作为一个整体测试不同,单元测试更应该专注于某个类。所以当被测试类与外部类有依赖的时候,尤其是与数据库相关的这种费时且有状态的类,很难做单元测试。但好在可以通过“Mockito”这种仿真框架来模拟这些比较费时的类,从而专注于测试某个类内部的逻辑
    2022-11-11
  • Spring-Boot 集成Solr客户端的详细步骤

    Spring-Boot 集成Solr客户端的详细步骤

    本篇文章主要介绍了Spring-Boot 集成Solr客户端的详细步骤,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-11-11
  • java运行jar包提示 “XXX中没有主清单属性” "找不到主类”两种解决办法

    java运行jar包提示 “XXX中没有主清单属性” "找不到主类”两种解决办法

    本文主要介绍了java运行jar包提示 “XXX中没有主清单属性” "找不到主类”两种解决办法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • MyBatis Plus 入门使用详细教程

    MyBatis Plus 入门使用详细教程

    这篇文章主要介绍了MyBatis Plus 入门使用详细教程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Java调用明华RF读写器DLL文件过程解析

    Java调用明华RF读写器DLL文件过程解析

    这篇文章主要介绍了Java调用明华RF读写器DLL文件过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • Spring Boot启动过程完全解析(二)

    Spring Boot启动过程完全解析(二)

    这篇文章主要介绍了Spring Boot启动过程完全解析(二),需要的朋友可以参考下
    2017-04-04
  • MyBatis不同Mapper文件引用resultMap实例代码

    MyBatis不同Mapper文件引用resultMap实例代码

    这篇文章主要介绍了mybatis 不同Mapper文件引用resultMap的实例代码,非常不错具有参考借鉴价值,需要的朋友可以参考下
    2017-07-07
  • SpringSecurity框架下实现CSRF跨站攻击防御的方法

    SpringSecurity框架下实现CSRF跨站攻击防御的方法

    CSRF是一种网络攻击方式,也可以说是一种安全漏洞,这种安全漏洞在web开发中广泛存在。这篇文章主要介绍了SpringSecurity框架下实现CSRF跨站攻击防御,需要的朋友可以参考下
    2019-12-12

最新评论