从log4j2到Disruptor详解

 更新时间:2021年12月22日 11:09:36   作者:会灰翔的灰机  
这篇文章主要介绍了从log4j2到Disruptor详解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

相信看过log4j2的源码后大家应该明白为什么第二代日志性能会提升那么多,这其中最大的功臣莫过于Disruptor并发编程框架。

下面我们就跟着log4j2来走进Disruptor这个神奇的框(wang)架(zhan)

log4j2异步日志简要回顾

从日志工厂(Log4jLoggerFactory)中获取日志Logger实例

从日志上下文工厂(Log4jContextFactory)获取日志上下文

启用日志上下文(AsyncLoggerContext)

启动Disruptor(AsyncLoggerDisruptor)

序列号屏障(ProcessingSequenceBarrier)等待序列号发布

等待策略(WaitStrategy)等待序列号

返回Logger等待序列号(即等待日志写入)

异步日志(AsyncLogger)写入

日志内容与转化者(RingBufferLogEventTranslator)绑定

Disruptor尝试发布转化者tryPublish

RingBuffer尝试发布事件tryPublishEvent

获取下一个可用序号

转化并发布序号,日志与序号对应的事件绑定,并发布序号

RingBuffer发布序号,MultiProducerSequencer发布

等待策略(waitStrategy)唤醒阻塞

Disruptor在log4j2中的应用

AsyncLoggerDisruptor

异步日志Disruptor启动

创建事件工厂EventFactory

计算ringBufferSize:AsyncLogger.RingBufferSize属性

创建等待策略:AsyncLogger.WaitStrategy属性

创建守护线程执行器executor

创建异步队列满时处理策略AsyncQueueFullPolicy(非Disruptor步骤)

创建Disruptor

  • 创建RingBuffer与Disruptor绑定
  • RingBuffer根据生产者类型创建对应的实例,例如多生产者:MultiProducerSequencer
  • 创建多生产者序号(bufferSize,waitStrategy)

绑定异常句柄(Disruptor.handleExceptionsWith)

绑定事件处理句柄(Disruptor.handleEventsWith)

  • 根据handle列表创建事件处理器createEventProcessors
  • RingBuffer为Sequence(MultiProducerSequencer)序列创建序列屏障ProcessingSequenceBarrier
  • 创建事件批处理器BatchEventProcessor
  • 为事件批处理器绑定异常处理句柄
  • 消费者仓库(consumerRepository)添加消费者,创建事件处理信息EventProcessorInfo添加至消费者信息列表consumerInfos
  • RingBuffer添加处理序列号列表processorSequences为序列号闸
  • 如果存在序列号屏障,从闸门中移除屏障序列号并标识endOfChain为false

启动Disruptor

  • 遍历消费者仓库放入执行器中执行消费者EventProcessorInfo
  • 启动事件批处理器BatchEventProcessor
  • 事件批处理器序列号自增1
  • 死循环
  • 序列号屏障ProcessingSequenceBarrier等待下个有效序列号,默认为超时等待策略,超时会继续下轮循环
  • 事件批处理器序列号如果小于等于有效序列号
  • 从RingBuffer中按照序列号获取event事件
  • 通知回调事件句柄eventHandler.onEvent如果当前消费下标等于有效序列号availableSequence说明是当前批次的最后一个消息,endOfBatch为true:eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
  • 事件批处理器序列号设置为有效序列号

异步日志Disruptor写入

尝试发布tryPublish事件转化器EventTranslator:RingBufferLogEventTranslator

Disruptor获取RingBuffer尝试发布事件tryPublishEvent

序列号获取下个有效序号,步进为1,例如:MultiProducerSequencer.tryNext

游标按照步进移动

判断是否有足够的空间,没有则抛出InsufficientCapacityException异常

返回有效序列号

转化器转化消息为对应有效序列号的事件放入entries

发布序列号

  • 设置有效序列号至缓存availableBuffer
  • 等待策略唤醒阻塞waitStrategy.signalAllWhenBlocking

架构及流程

红色数字标识流程为获取logger时Disruptor创建消费者流程

黑色数字标识流程为logger写入日志时Disruptor创建事件并通知消费者流程

RingBuffer对于所有消费者、生产者是同一个实例

  • 环形队列,dataProvide,数据的存储与提供者

Sequencer:生产者

  • 对于所有消费者、生产者(可能是多生产者序列类型对于Multi类型)是同一个实例,包含一个游标序列号Sequence

SequenceBarrier:序列号屏障

  • 对于所有消费者、生产者也是同一个实例,序列号屏障包含一个等待策略、一个RingBuffer引用、一个游标序列号、一个依赖序列号(可能是组序列号类型)

BatchEventProcessor:消费者

  • 消费者包含一个RingBuffer引用
  • 一个序列号屏障,可以包含多个屏障序列号,默认为0个则使用RingBuffer的MultiProducerSequencer的游标序列号Sequence
  • 一个EventHandler:RingBufferLogEventHandler
  • 遍历EventHandler列表将其封装为BatchEventProcessor,将其与原始eventHandler、barrier屏障注册至消费者资源库consumerRepository。
  • 获取batchEventProcessor序列号默认为-1,将其缓存至processorSequences标识正在处理,并将processorSequences、disruptor、consumerRepository绑定至EventHandlerGroup。Disruptor启动遍历消费者资源库启动消费者:BatchEventProcessor

消费者入口

  • 消费者消费前先自增本地序列号(即-1+1=0序号),向序列号屏障申请该序列号的消费,默认为Timeout策略申请。
  • 屏障收到申请waitFor序列号,当前屏障游标序列号小于申请的消费序列号,等待生产者生产至当前序列号,如果超时则抛出异常(本地序列号不更新继续重试);如果没有超时,将屏障的dependentSequence序列号(如果不是非多序列号屏障类型,log4j2使用的是非多序列号屏障,则是屏障的本地游标)赋值为availableSequence返回。
  • 如果availableSequence有效的序列号(即屏障的游标序列号)小于申请要消费的序列号直接返回availableSequence(即消费超出的生产的速度,消费者申请的序列号向后回移至有效序列号)。否则getHighestPublishedSequence判断申请的序列号至availableSequence序列号之间的每个序列号对应的消息事件均是有效的则返回有效序列号(即生产者生产很快,消费者申请消费的序列号很小,向前移动至有效的,可能是本身也可能会跳跃多个下标),根据生产者的availableBuffer判断是否有效,因为生产者先发布序列号再写入数据,此处避免了读取数据异常,如果数据没有写入,有效序列号缓存标识没有写入(即无效),消费者会进行刚刚所说的“重试”,如果之间存在无效序列号则返回申请序列号-1(即回滚一个值,进入逻辑时增加了一个值,也就是回滚至申请前的点,可以理解为与超时相同,即重试)
  • 如果申请的序列号小于等于有效的序列号,则消费序列号对应的消息事件并更新本地BatchEventProcessor的序列号,按照下标去dataProvide(RingBuffer.entries)中提取对应位置的数据消费
  • 如果申请的序列号大于有效的序列号,则将消费者本地序列号设置为有效序列号(即消费超出的生产的速度,消费的序列号向后回移)
  • 如果期间出现任何未catch住的异常则会跳过当前下标,异常出现时的下标及对应的事件会交由exceptionHandler处理,默认为AsyncLoggerDefaultExceptionHandler异步处理,会将异常事件输出至系统的标准错误管道,虽然是异步也是会占用消费者线程池资源

Disruptor:生产者入口

  • 获取RingBuffer尝试发布消息,生产者(例如:MultiProducerSequencer)
  • 生产者游标序列号尝试自增,判断当前是否有足够的空间,当前游标+步进-bufferSize是否大于最小的闸门序列号(gatingSequences,即:所有消费者的本地游标序列号processorSequences列表),最小序列号会缓存至本地gatingSequenceCache用于下次判断减少进行所有闸门序列号的遍历次数,如果是说明已经没有空间(因为生产者生产申请的序列号已经追上了消费者消费序列号的最小值。RingBuffer是一个环形队列结构。上面已经讲到消费者序列号会与生产者序列号同步,同步指消费者申请序列号小于有效序列号时会前进至有效序列号,即使有延迟也保证了有大于等于buffer值的缓冲空间供生产者生产),如果没有空间返回false进入下一轮生产

在这里插入图片描述

  • 自增成功后,将消息转化为对应序列号下标位置的事件数据
  • Sequencer发布序列号,将当前序列号设置为有效(availableBuffer),并根据等待策略唤醒等待的消费者,被唤醒的消费者根据发布的序列号获取相应下标处事件数据进行处理

disruptor_new.jpg

Disruptor为什么这么快?

Disruptor采用无锁并发编程,框架中主要使用CAS与volatile关键字保证并发安全

使用环形数据结构(另一个典型的应用是时钟算法也是使用的环形数据结构),为环形结构添加序列号屏障来控制对环形队列读写操作,保证存储数据的并发安全

另一个点便是神奇的缓冲行填充了

Log4j2为什么这么快?

使用Disruptor并发编程框架

使用NIO写入日志数据

当然log4j2中有很多细节,如果我们想要获取线程栈信息,可以同样学习一下这样的写法

// LOG4J2-1029 new Throwable().getStackTrace is faster than Thread.currentThread().getStackTrace().
final StackTraceElement[] stackTrace = new Throwable().getStackTrace();
StackTraceElement last = null;
for (int i = stackTrace.length - 1; i > 0; i--) {
    final String className = stackTrace[i].getClassName();
    if (fqcnOfLogger.equals(className)) {
        return last;
    }
    last = stackTrace[i];
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java与Node.js利用AES加密解密出相同结果的方法示例

    Java与Node.js利用AES加密解密出相同结果的方法示例

    这篇文章主要介绍了Java与Node.js利用AES加密解密出相同结果的方法,文中给出了详细的示例代码,相信对大家的学习或者工作能带来一定的帮助,需要的朋友们下面来一起看看吧。
    2017-02-02
  • 深入理解Spring MVC概要与环境配置

    深入理解Spring MVC概要与环境配置

    本篇文章主要介绍了深入理解Spring MVC概要与环境配置 ,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2017-03-03
  • Java中StringUtils与CollectionUtils和ObjectUtil概念讲解

    Java中StringUtils与CollectionUtils和ObjectUtil概念讲解

    这篇文章主要介绍了Java中StringUtils与CollectionUtils和ObjectUtil概念,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2022-12-12
  • Java中的锁与锁的状态升级详细解读

    Java中的锁与锁的状态升级详细解读

    这篇文章主要介绍了Java中的锁与锁的状态升级详细解读,Java 1.6以后官方针对锁的优化,主要是增加了两种新的锁:偏向锁和轻量级锁,再加上本身重量级锁,那么锁基本上可以大致分为这三种,它们之间的区别主要是体现在等待时间上面,需要的朋友可以参考下
    2024-01-01
  • Java代理模式与动态代理之间的关系以及概念

    Java代理模式与动态代理之间的关系以及概念

    代理模式是开发中常见的一种设计模式,使用代理模式可以很好的对程序进行横向扩展。动态代理:代理类在程序运行时被创建的代理方式。关键在于动态,程序具有了动态特性,可以在运行期间根据不同的目标对象生成动态代理对象
    2023-02-02
  • 浅谈对Java双冒号::的理解

    浅谈对Java双冒号::的理解

    这篇文章主要介绍了浅谈对Java双冒号::的理解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • java使用wait和notify实现线程通信

    java使用wait和notify实现线程通信

    这篇文章主要为大家详细介绍了java如何使用wait和notify实现线程之间通信,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-10-10
  • mall整合SpringSecurity及JWT实现认证授权实战

    mall整合SpringSecurity及JWT实现认证授权实战

    这篇文章主要为大家介绍了mall整合SpringSecurity及JWT实现认证授权实战示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • SpringBoot2种单元测试方法解析

    SpringBoot2种单元测试方法解析

    这篇文章主要介绍了SpringBoot2种单元测试方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • java 调用本地扬声器的步骤

    java 调用本地扬声器的步骤

    博主的毕设系统在做一个餐厅的点餐管理系统,在进行移动端页面开发的时候突发奇想做一个呼叫功能,因此就有了这篇文章
    2021-05-05

最新评论