RocketMQ 源码分析Broker消息刷盘服务

 更新时间:2023年05月09日 14:53:17   作者:林师傅  
这篇文章主要为大家介绍了RocketMQ 源码分析Broker消息刷盘服务示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

上篇文章我们介绍了消息刷盘的四种方式,本篇文章我们来介绍Broker是如何实现这四种刷盘方式。

刷盘服务源码分析

Broker中的四种刷盘分别是由CommitRealTimeService,FlushRealTimeService,GroupCommitService将消息从内存中刷到磁盘上的。在了解刷盘这三个刷盘服务之前,我们先来了解MappedFile中下面几个属性

public class MappedFile extends ReferenceResource {
    // 当前写文件位置,即数据被写入MappedFile的最新指针,可能存在ByteBuffer中,没有提交
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 数据被写入文件的最新指针(只是被写入文件映射,不一定被刷盘)
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 刷盘位置,该指针之前的数据都持久化存储到磁盘中
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小,默认是1042*1024*4(4GB)
    protected int fileSize;
    // 起始偏移量,MappedFile创建时从文件名中解析
    private long fileFromOffset;
}

上面几个属性在MappedFile中的位置如下图所示

上面几个位置关系: flushedPosition ≤ commitedPosition ≤ wrotePosition

CommitRealTimeService刷盘源码分析

CommitRealTimeService类的作用就是将上图中红色的消息(也就是committedPosition -> wrotePosition之间的消息)从直接内存ByteBuffer提交到FileChannel,提交完成并不带表刷盘完成,还需要将FileChannel将数据刷到硬盘中,才正式刷盘完成。CommitRealTimeService核心代码逻辑是在run()中,在run()中是包含一个死循环,死循环中每个200ms提交一次消息,每次最少提交4页的消息,每页大小是4kb,也就是说只有wrotePosition - committedPosition ≥ 4*4kb,消息才会被提交。

// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
public void run() {
	// 死循环
  while (!this.isStopped()) {
      // 消息提交时间间隔,默认200ms
      int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
      // 最少提交页数,默认是4
      int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
      try {
        	// 提交消息
          boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
        	// 等待200ms
          this.waitForRunning(interval);
      } catch (Throwable e) {
          CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
      }
  }
}

上面mappedFileQueue#commit提交最终会调用MappedFile#commit0,commit0代码逻辑如下,将直接内存ByteBuffer中的数据拷贝到fileChannel中。

// org.apache.rocketmq.store.MappedFile#commit0
protected void commit0() {
    // 写指针
    int writePos = this.wrotePosition.get();
    // 最后提交指针
    int lastCommittedPosition = this.committedPosition.get();
    // byteBuffer的数据提交到FileChannel
    if (writePos - lastCommittedPosition > 0) {
        try {
            ByteBuffer byteBuffer = writeBuffer.slice();
            byteBuffer.position(lastCommittedPosition);
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

FlushRealTimeService刷盘源码分析

FlushRealTimeService的代码与CommitRealTimeService类似,核心代码也带run()中,run()中也是一个死循环,每隔500ms调用mappedFileQueue#flush刷盘。

// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
    while (!this.isStopped()) {
        // 定时刷盘时间间隔,默认500ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        // 一次刷盘页数,默认是4页
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
        try {
            if (flushCommitLogTimed) {
                // sleep 500ms
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }
						// 消息刷盘
            CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
        } catch (Throwable e) {
            this.printFlushProgress();
        }
    }
}

mappedFileQueue#flush刷盘最终调用了MappedFile#flush,代码如下所示,可以看到如果MappedFile中有直接内存写缓存,则会调用fileChannel.force(false)刷盘,如果没有写缓存,则消息直接提交到MappedFile的内存映射文件mappedByteBuffer中,因此调用mappedByteBuffer.force()刷盘。

// org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
            try {
                // 如果使用了堆外内存,那么通过fileChannel强制刷盘,这是异步堆外内存的逻辑
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    // 如果没有使用堆外内存,那么通过fileChannel强制刷盘,这是同步或者异步刷盘走的逻辑
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
            // 设置刷盘位置为写入位置
            this.flushedPosition.set(value);
            // 减少对该MappedFile的引用次数
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

GroupCommitService刷盘源码分析

同步刷盘GroupCommitService代码与上述代码类似,都继承了ServiceThread,它的核心逻辑在GroupCommitService#run,在run()中也是一个死循环,每隔10ms调用一次doCommit(),虽然这个方法的名字叫doCommit,实际底层也与FlushRealTimeService相同,都是调用的mappedFileQueue#flush,将mappedByteBuffer中的数据刷入磁盘。

// org.apache.rocketmq.store.CommitLog.GroupCommitService#run
public void run() {
    // 死循环
    while (!this.isStopped()) {
        try {
            // 间隔10ms
            this.waitForRunning(10);
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
}

看到这里大家可能会有疑问,为什么同步刷盘也是定时刷盘,这与异步刷盘有什么区别呢?实际上这里有着相当精妙的设计,在上篇文章中我们了解到同步刷盘包括等待消息保存与不等待消息保存。

如果不等待消息保存,则调用了ServiceThread#wakeup方法。

public void wakeup() {
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); 
    }
}

ServiceThread状态如下所示,如果刷盘线程在10ms等待中,hasNotified属性值为false,hastNotified更新成功,刷盘线程被唤醒,立即停止等待。如果刷盘线程正在执行中,hasNotified更新失败,刷盘线程唤醒失败。只能等待下一次被唤醒或者下一次时间间隔后再次刷盘。

如果是要等待刷盘成功后才返回结果,就要利用到GroupCommitService属性中两个刷盘请求容器

  • requestWrite

同步刷盘请求暂存容器

  • requestsRead

处理中的刷盘请求容器

class GroupCommitService extends FlushCommitLogService {
    // 同步刷盘请求暂存容器
    private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
    // 每次处理刷盘的request容器
    private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
}

提交刷盘请求首先会被放入到requestsWrite容器中,然后再唤醒刷盘线程。

// org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest
public synchronized void putRequest(final GroupCommitRequest request) {
    lock.lock();
    try {
        // 写请求
        this.requestsWrite.add(request);
    } finally {
        lock.unlock();
    }
    // 唤醒当前线程
    this.wakeup();
}

刷盘线程被唤醒或者线程结束等待时都会调用onWaitEnd()方法,交换请求暂存容器和刷盘request容器

// org.apache.rocketmq.store.CommitLog.GroupCommitService#onWaitEnd
@Override
protected void onWaitEnd() {
    this.swapRequests();
}
// org.apache.rocketmq.store.CommitLog.GroupCommitService#swapRequests
// 交换请求暂存容器和刷盘request容器
private void swapRequests() {
    lock.lock();
    try {
        LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    } finally {
        lock.unlock();
    }
}

线程被唤醒后会调用doCommit(),从下面代码可以发现,不管requestsRead是否包含要处理的刷盘请求,实际都是通过调用mappedFileQueue#flush执行刷盘。

  • 如果requestsRead中包含刷盘请求

则有可能需要调用mappedFileQueue#flush,确保当前请求的消息能够被刷盘,并返回刷盘结果给客户端,如果包含请求,最多会调用两次刷盘方法,确保消息能够正确刷盘。

由于文件是固定大小,有可能刷盘位置在上一个MappedFile中,当前消息请求在最新的MappedFile中,刷盘两次,确保当前消息能够被刷入硬盘中

  • 如果requestsRead中不包含刷盘请求

处理请求容器中包含request,直接调用MappedFileQueue#flush,如果当前消息不在flushPosition所在的mappedFile中,则本次刷盘有可能并不会将当前消息持久化到磁盘中,需要等待下次刷盘。

// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
    // 如果处理Request不空
    if (!this.requestsRead.isEmpty()) {
        // 遍历处理Request
        for (GroupCommitRequest req : this.requestsRead) {
            // 如果刷盘指针大于刷盘请求中需要刷盘的offSet
            boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            // 消息刷盘
            for (int i = 0; i < 2 && !flushOK; i++) {
                CommitLog.this.mappedFileQueue.flush(0);
                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            }
            // 唤醒客户端
            req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
    } else {
        // 如果消息不等待刷盘成功就返回,则不会提交刷盘请求,调用这个方法
        CommitLog.this.mappedFileQueue.flush(0);
    }
}

总结

本次我们了解了RocketMQ中四种刷盘策略对应的刷盘服务

  • 同步刷盘-等待消息保存到磁盘
  • 同步刷盘-不等待消息保存到磁盘上

上面两个同步刷盘都是由GroupCommitService实现的,由GroupCommitService将MappedByteBuffer消息刷盘到磁盘上

  • 异步刷盘-开启堆外缓存

如果开启了堆外缓存,刷盘时会先由CommitRealTimeService将消息从Bytebuffer拷贝到FileChannel,FlushRealTimeService再将消息从FileChannel刷到磁盘上

  • 异步刷盘-不开启堆外缓存

这种方式也是默认的刷盘方式,由FlushRealTimeService将MappedByteBuffer消息刷盘到磁盘上

以上就是RocketMQ 源码分析Broker消息刷盘服务的详细内容,更多关于RocketMQ Broker刷盘服务的资料请关注脚本之家其它相关文章!

相关文章

  • java集合Collection实现类解析ArrayList LinkedList及Vector

    java集合Collection实现类解析ArrayList LinkedList及Vector

    这篇文章主要为大家介绍了java集合Collection实现类解析ArrayList LinkedList及Vector,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2022-03-03
  • IDEA中request.getParameter爆红问题及解决

    IDEA中request.getParameter爆红问题及解决

    这篇文章主要介绍了IDEA中request.getParameter爆红问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-11-11
  • Java并发编程之ReentrantLock解析

    Java并发编程之ReentrantLock解析

    这篇文章主要介绍了Java并发编程之ReentrantLock解析,ReentrantLock内容定义了一个抽象类Sync,继承自AQS,而不是自己去继承AQS,所有对ReentrantLock的操作都会转化为对Sync的操作,需要的朋友可以参考下
    2023-12-12
  • 详解Java中String类型与默认字符编码

    详解Java中String类型与默认字符编码

    这篇文章主要介绍了Java中String类型与默认字符编码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-05-05
  • Java异常处理的五个关键字

    Java异常处理的五个关键字

    本篇文章给大家详细讲述了关于Java异常处理的相关知识点,并列举了5个重要关键字,一起啊参考学下。
    2018-03-03
  • SpringBoot使用Redis缓存的实现方法

    SpringBoot使用Redis缓存的实现方法

    这篇文章主要介绍了SpringBoot使用Redis缓存的实现方法,需要的朋友可以参考下
    2018-02-02
  • Java本地缓存实现代码示例

    Java本地缓存实现代码示例

    这篇文章主要给大家介绍了关于Java本地缓存实现的相关资料,对于缓存的作用不言而喻,可以提高查询效率,比去DB查询的速度要快,文中给出了详细的代码示例,需要的朋友可以参考下
    2023-08-08
  • java中的前++和后++的区别示例代码详解

    java中的前++和后++的区别示例代码详解

    这篇文章主要介绍了java中的前++和后++的区别示例代码详解,其实大家只要记住一句话就可以了,前++是先自加再使用而后++是先使用再自加,本文通过代码给大家详细解说,感兴趣的朋友跟随小编一起看看吧
    2020-06-06
  • 关于Java中方法引用的示例

    关于Java中方法引用的示例

    方法引用可以认为是Lambda表达式的一种特殊形式,Lambda表达式可以让开发者自定义抽象方法的实现代码,方法引用则可以让开发者直接引用已存在的实现方法,作为Lambda表达式的Lambda体(参数列表得一致),需要的朋友可以参考下
    2023-05-05
  • java中的interface接口实例详解

    java中的interface接口实例详解

    这篇文章主要介绍了 java中的interface接口实例详解的相关资料,需要的朋友可以参考下
    2017-03-03

最新评论