RocketMQ源码分析之Broker过期消息清理机制

 更新时间:2023年05月09日 10:54:44   作者:林师傅  
这篇文章主要为大家介绍了RocketMQ源码分析之Broker过期消息清理机制示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

前面文章讲了消息是如何保存的以及consumeQueue与Index文件更新机制。随着消息的增加,Broker不可能一直保存所有消息,Broker是按照什么规则清理消息的呢?被消费过后的消息就会被清理掉吗?下面我们来介绍Broker消息清理机制。

Broker消息清理机制简介

消息是被顺序存储在CommitLog文件中的,且消息长度不定长,因此消息的清理不是以消息为单位进行的,而是以CommitLog为单位进行的。默认情况下,Broker会清理单个CommitLog文件中最后一条消息超过72小时的CommitLog文件,除了用户手动清理为,下面几种情况会被默认清理。

  • CommitLog清理机制

    • CommitLog文件过期(72小时),且达到清理时间点(默认为04:00~05:00),自动清理过期的CommitLog文件

    • CommitLog文件过期(72小时),且CommitLog所在磁盘分区占用率已经达到过期清理警戒线(默认75%),无论是否到达清理时间点都会自动清理过期文件

    • CommitLog所在磁盘分区占用率已经达到清理警戒线(默认85%),无论是否过期,都会从最早的文件开始清理,一次最多清理10个文件

    • CommitLog所在磁盘分区占用率已经达到系统危险警戒线(默认90%),Broker将拒绝消息写入

    • Broker至少会保留最新的CommitLog文件

  • ConsumeQueue清理机制

    • 如果ConsumeQueue文件关联CommitLog都被清理,则清理此ConsumeQueue文件
    • Broker每个Topic-QueueId至少会保留最新的文件
  • IndexFile清理机制

    • 如果IndexFile所有索引单元关联CommitLog都被清理,则清理此IndexFile

Broker与消息清理相关配置

# 文件自动清理时间,单位H,默认72
fileReservedTime=72
# CommitLog物理文件删除间隔,但是ms,默认100
deleteCommitLogFilesInterval = 100
# 文件自动清理时间,默认04,即凌晨4点
deleteWhen = "04"
# 硬盘占用率所在分区过期清理警戒线,超过这个值,无论是否到达清理时间,都会自动清理过期文件
diskMaxUsedSpaceRatio = 75

消息清理机制源码分析

消息定时清理的是由DefaultMessageStore类负责的,它在启动时(start)会调用DefaultMessageStore#addScheduleTask添加和消息存储相关的定时任务,其中就包括消息删除相关的定时任务DefaultMessageStore.this.cleanFilesPeriodically(),这个定时任务在Broker启动后60s开始,每隔10秒执行一次

// org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
private void addScheduleTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            // commitLog、consumeQueue和IndexFile定时删除
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval()/*10s*/, TimeUnit.MILLISECONDS);
  	// ...
}

在cleanFilesPeriodically()中有两个方法,cleanCommitLogService.run()负责清理CommitLog,cleanConsumeQueueService.run()负责清理ConsumeQueue和IndexFile。

// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() {
    // 清理CommitLog
    this.cleanCommitLogService.run();
    // 清理ConsumeQueue和IndexFile
    this.cleanConsumeQueueService.run();
}

CommitLog清理源码分析

CommitLog清理方法CleanCommitLogService#run调用了CleanCommitLogService#deleteExpiredFiles,deleteExpiredFiles方法的核心代码逻辑如下,以下三种情况会触发CommitLog文件的删除

  • 当前时间是凌晨4点
  • CommitLog所在磁盘分区硬盘占用率超过75%
  • 手动删除CommitLog
// org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles
private void deleteExpiredFiles() {
    // 是否是凌晨4点,用小时匹配[04:00,05:00)
    boolean timeup = this.isTimeToDelete();
    // >75%就会返回true,如果大于85%,则触发强制删除
    boolean spacefull = this.isSpaceToDelete();
    // 手动删除次数是否>0
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    if (timeup/*凌晨4点*/ || spacefull/*空间满了*/ || manualDelete/*手动删除*/) {
        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable()/*默认true*/ && this.cleanImmediately/*空间占用超过85%,触发强制删除*/;
				// 删除CommitLog
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

CommitLog的清理逻辑在MappedFileQueue#deleteExpiredFileByTime,其核心代码如下所示,主要分为下面几个步骤

  • 复制MappedFileQueue中的mappedFiles,循环处理删除逻辑,循环mfsLength-1次,也就是无论如何都会保留最新的MappedFile
  • 如果MappedFile的最后修改时间超过72小时或CommitLog所在磁盘分区硬盘占用率超过85%触发强制删除MappedFile,则会删除MappedFile,每次删除最多删除10个MappedFile,相邻MappedFile删除时间间隔默认100ms
  • 删除MappedFileQueue的mappedFiles数组中已删除的MappedFile,并返回删除MappedFile的数量
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {
    // 复制一份当前mappedFile
    Object[] mfs = this.copyMappedFiles(0);
		// 会保留最后一个MappedFile
    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    List<MappedFile> files = new ArrayList<MappedFile>();
    if (null != mfs) {
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 最后修改时间+过期时间
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
            // 如果commitLog所在磁盘分区总容量超过85%,触发立即删除,或者超过了72小时的mappedFile
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                // 删除mappedFile
                if (mappedFile.destroy(intervalForcibly)) {
                    files.add(mappedFile);
                    deleteCount++;
                    // 一次最多删除10个mappedFile
                    if (files.size() >= DELETE_FILES_BATCH_MAX/*10*/) {
                        break;
                    }
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            // 删除文件时间间隔,默认100ms
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {
                        }
                    }
                } else {
                    break;
                }
            } else {
                //avoid deleting files in the middle
                break;
            }
        }
    }
    // 从MappedFileQueue的mappedFiles中删除这个mappedFile
    deleteExpiredFile(files);
    return deleteCount;
}

ConsumeQueue和IndexFile清理源码分析

ConsumeQueue和IndexFile清理方法CleanConsumeQueueService#run调用了CleanConsumeQueueService#deleteExpiredFiles方法清理ConsumeQueue和IndexFile。CleanConsumeQueueService#deleteExpiredFiles核心代码如下,包括两个主要逻辑

  • 遍历consumeQueueTable中的ConsumeQueue,调用ConsumeQueue#deleteExpiredFile删除过期ConsumeQueue
  • 调用IndexService#deleteExpiredFile删除过期IndexFile
// org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles
private void deleteExpiredFiles() {
    if (minOffset > this.lastPhysicalMinOffset) {
        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
				// 遍历ConsumeQueue
        for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
            for (ConsumeQueue logic : maps.values()) {
                // 删除consumeQueue
                int deleteCount = logic.deleteExpiredFile(minOffset);
								// ... 间隔100ms
            }
        }
				// 删除indexFile
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}

ConsumeQueue文件清理

ConsumeQueue文件底层也是MappedFile,清理ConsumeQueue调用MappedFileQueue#deleteExpiredFileByOffset清理ConsumeQueue的过期MappedFile,源码如下,核心逻辑

  • 复制MappedFileQueue中的mappedFiles,循环处理清理逻辑,循环mfsLength-1次,也就是无论如何都会保留最新的MappedFile
  • 如果ConsumeQueue的MappedFile最后一个存储单元对应消息在CommitLog中的偏移量小于CommitLog的最小偏移量,说明当前MappedFile所有存储单元对应所有CommitLog的消息都已经被清理,因此调用MappedFile#destroy清理当前MappedFile
  • 删除MappedFileQueue缓存的mappedFiles列表中已经被清理MappedFile
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByOffset
public int deleteExpiredFileByOffset(long offset, int unitSize) {
    // 复制一份mappedFiles
    Object[] mfs = this.copyMappedFiles(0);
    List<MappedFile> files = new ArrayList<MappedFile>();
    int deleteCount = 0;
    if (null != mfs) {
        int mfsLength = mfs.length - 1;
        for (int i = 0; i < mfsLength; i++) {
            boolean destroy;
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 取consumeQueue最后一条消息Buffer切片
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
            if (result != null) {
                // consumeQueue最后一个存储单元消息在commitLog的偏移量
                long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                result.release();
                // 如果consumeQueue最后一条消息已经小于commitLog的最小offset,则说明要删除了
                destroy = maxOffsetInLogicQueue < offset;
                if (destroy) {
                    log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                        + maxOffsetInLogicQueue + ", delete it");
                }
            } 
            // 删除ConsumeQueue的MappedFile
            if (destroy && mappedFile.destroy(1000 * 60)) {
                files.add(mappedFile);
                deleteCount++;
            } else {
                break;
            }
        }
    }
    // 删除MappedFileQueue的mappedFiles列表中已经删除的MappedFile
    deleteExpiredFile(files);
    return deleteCount;
}

IndexFile清理

IndexFile清理逻辑与ConsumeQueue类似,都是删除文件中关联的CommitLog消息全部被删除的文件。核心逻辑包括下面两个

  • 获取IndexFileList中所有最大offset小于CommitLog最小offset的IndexFile
  • 删除过期的IndexFile
// org.apache.rocketmq.store.index.IndexService#deleteExpiredFile(long)
public void deleteExpiredFile(long offset) {
    Object[] files = null;
    try {
        // indexFileList的第一个索引文件的最后一个offset
        long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
        if (endPhyOffset < offset) {
            files = this.indexFileList.toArray();
        }
    } 
    if (files != null) {
        List<IndexFile> fileList = new ArrayList<IndexFile>();
        for (int i = 0; i < (files.length - 1); i++) {
            IndexFile f = (IndexFile) files[i];
          	// IndexFile中最大的offset小于CommitLog最小offset,说明文件可以被删除
            if (f.getEndPhyOffset() < offset) {
                fileList.add(f);
            } else {
                break;
            }
        }
        // 删除过期的IndexFile,并将其从indexFileList缓存中删除
        this.deleteExpiredFile(fileList);
    }
}

总结

Broker消息清理机制由DefaultMessageStore负责,CommitLog、ConsumeQueue和IndexFile的清理都是按照文件颗粒度进行。

每10s检查一次,通常情况下每天凌晨4点删除超过72小时的CommitLog;如果CommitLog所在磁盘分区的磁盘占用率超过75%,则会触发CommitLog文件清理;如果CommitLog所在磁盘分区的磁盘占用率超过85%,则会强制删除CommitLog文件;

如果ConsumeQueue和IndexFile关联CommitLog都被删除,ConsumeQueue文件和IndexFile也会被清理。

以上就是RocketMQ | 源码分析】Broker过期消息清理机制的详细内容,更多关于RocketMQ | 源码分析】Broker过期消息清理机制的资料请关注脚本之家其它相关文章!

相关文章

  • 详解java各种集合的线程安全

    详解java各种集合的线程安全

    这篇文章主要介绍了详解java各种集合的线程安全,小编觉得挺不错的,这里分享给大家,供需要的朋友参考。
    2017-10-10
  • 详解Kotlin中如何实现类似Java或C#中的静态方法

    详解Kotlin中如何实现类似Java或C#中的静态方法

    Kotlin中如何实现类似Java或C#中的静态方法,本文总结了几种方法,分别是:包级函数、伴生对象、扩展函数和对象声明。这需要大家根据不同的情况进行选择。
    2017-05-05
  • Spring Data JPA实现排序与分页查询超详细流程讲解

    Spring Data JPA实现排序与分页查询超详细流程讲解

    在介绍Spring Data JPA的时候,我们首先认识下Hibernate。Hibernate是数据访问解决技术的绝对霸主,使用O/R映射技术实现数据访问,O/R映射即将领域模型类和数据库的表进行映射,通过程序操作对象而实现表数据操作的能力,让数据访问操作无须关注数据库相关的技术
    2022-10-10
  • 深入解析Java接口(interface)的使用

    深入解析Java接口(interface)的使用

    这篇文章主要介绍了深入解析Java接口(interface)的使用,是Java入门学习中的基础知识,需要的朋友可以参考下
    2015-09-09
  • javaweb Servlet开发总结(一)

    javaweb Servlet开发总结(一)

    Servlet是sun公司提供的一门用于开发动态web资源的技术。这篇文章主要介绍了javaweb Servlet开发的第一篇,感兴趣的小伙伴们可以参考一下
    2016-05-05
  • Java Eureka探究细枝末节

    Java Eureka探究细枝末节

    Eureka是Netflix开发的服务发现框架,本身是一个基于REST的服务,主要用于定位运行在AWS域中的中间层服务,以达到负载均衡和中间层服务故障转移的目的
    2022-09-09
  • 深入讲解Java的对象头与对象组成

    深入讲解Java的对象头与对象组成

    由于Java面向对象的思想,在JVM中需要大量存储对象,存储时为了实现一些额外的功能,需要在对象中添加一些标记字段用于增强对象功能,这些标记字段组成了对象头,下面这篇文章主要给大家介绍了关于Java对象头与对象组成的相关资料,需要的朋友可以参考下
    2022-02-02
  • 从0开始教你开发一个springboot应用

    从0开始教你开发一个springboot应用

    这篇文章主要为大家介绍了从0开始开发一个springboot应用教程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • 详解SpringMVC的拦截器参数及拦截器链配置

    详解SpringMVC的拦截器参数及拦截器链配置

    拦截器(Interceptor)是一种动态拦截方法调用的机制,在SpringMVC中动态拦截控制器方法的执行。本文将详细讲讲SpringMVC中拦截器参数及拦截器链配置,感兴趣的可以尝试一下
    2022-07-07
  • 基于Spring AOP @AspectJ进阶说明

    基于Spring AOP @AspectJ进阶说明

    这篇文章主要介绍了基于Spring AOP @AspectJ进阶说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01

最新评论