RocketMQ特性Broker存储事务消息实现

 更新时间:2022年08月17日 14:07:09   作者:奔跑的毛球  
这篇文章主要为大家介绍了RocketMQ特性Broker存储事务消息实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。

private void initialTransaction() {
    this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    if (null == this.transactionalMessageService) {
        this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
        LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    }
    this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    if (null == this.transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
        LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    }
    this.transactionalMessageCheckListener.setBrokerController(this);
    this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

这里有三个核心的初始化变量

TransactionalMessageService

事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()方法进行加载。

TransactionalMessageService类定义如下。内部属性已加注释标明。

public interface TransactionalMessageService {
    //用于保存Half事务消息
    PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
    CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
    //删除事务消息
    boolean deletePrepareMessage(MessageExt messageExt);
    //提交事务消息
    OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
    //回滚事务消息
    OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
    void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    //打开事务消息
    boolean open();
    //关闭事务消息
    void close();
}

transactionalMessageCheckListener

事务消息回查监听器

transactionalMessageCheckService

事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。

处理事务消息

当初始化完成之后,Broker就可以处理事务消息了。

Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,这和普通消息其实是一样的。

但是有两点针对事务消息的特殊处理

第一处:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//获取扩展字段的值,若是该值为true则为事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { 
    //判断当前Broker配置是否支持事务消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
    //保存Half信息
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

第二处:

存储事务消息前的预处理,对应方法是

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //将原消息的topic保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //将原消息的QueueId保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //将原消息的SysFlag保存在扩展字段中
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //修改Queueid为0
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

完成上述步骤之后,调用DefaultMessageStole.putMessage()方法将其保存到CommitLog中。

CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法对其进行处理。

final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
    // Prepared and Rollback message is not consumed, will not enter the consume queue
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        queueOffset = 0L;
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    default:
        break;
}

这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费

以上就是RocketMQ特性Broker存储事务消息实现的详细内容,更多关于RocketMQ Broker存储事务消息的资料请关注脚本之家其它相关文章!

相关文章

  • 浅谈Java中File文件的创建以及读写

    浅谈Java中File文件的创建以及读写

    文中有非常详细的步骤介绍了Java中file文件的创建以及读写,对刚开始学习java的小伙伴们很有帮助,而且下文有非常详细的代码示例及注释哦,需要的朋友可以参考下
    2021-05-05
  • 利用Java实现天气预报播报功能

    利用Java实现天气预报播报功能

    这篇文章主要为大家介绍了如何利用Java语言实现天气预报播报功能,文中的示例代码讲解详细,对我们学习Java有一定的帮助,需要的可以参考一下
    2022-06-06
  • Java 数据结构哈希算法之哈希桶方式解决哈希冲突

    Java 数据结构哈希算法之哈希桶方式解决哈希冲突

    实际上哈希桶是解决哈希表冲突的一种方法。常见的解决冲突的两种方法:分离链接法、开放定址法。其中使用分离链接法,得到的对应关系即为哈希桶
    2022-02-02
  • spring boot自带的page分页问题

    spring boot自带的page分页问题

    这篇文章主要介绍了spring boot自带的page分页问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • 告别无尽等待:Java中的轮询终止技巧

    告别无尽等待:Java中的轮询终止技巧

    在Java中,轮询是一种常见的处理方式,用于检查某个条件是否满足,直到满足条件或达到一定的时间限制,本文将介绍Java中常用的轮询结束方式,包括使用循环、定时器和线程池等方法,需要的朋友可以参考下
    2023-10-10
  • 全面分析Java方法的使用与递归

    全面分析Java方法的使用与递归

    在java中,方法就是用来完成解决某件事情或实现某个功能的办法;程序调用自身的编程技巧称为递归( recursion)。递归做为一种算法在程序设计语言中广泛应用。但是如果没终止条件会造成死循环,所以递归代码里要有结束自调自的条件,本篇接下来讲解一下方法与递归
    2022-04-04
  • Java KindEditor粘贴图片自动上传到服务器功能实现

    Java KindEditor粘贴图片自动上传到服务器功能实现

    这篇文章主要介绍了Java KindEditor粘贴图片自动上传到服务器功能实现,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-04-04
  • java 2d画图示例分享(用java画图)

    java 2d画图示例分享(用java画图)

    这篇文章主要介绍了java 2D画图示例(用java画图),需要的朋友可以参考下
    2014-04-04
  • Spring Boot的filter(过滤器)简单使用实例详解

    Spring Boot的filter(过滤器)简单使用实例详解

    过滤器(Filter)的注册方法和 Servlet 一样,有两种方式:代码注册或者注解注册,下面通过实例给大家介绍Spring Boot的filter(过滤器)简单使用,一起看看吧
    2017-04-04
  • 解析Java中如何获取Spring中配置的bean

    解析Java中如何获取Spring中配置的bean

    本篇文章是对在Java中如何获取Spring中配置的bean进行了详细的分析介绍,需要的朋友参考下
    2013-07-07

最新评论