RocketMQ线程池创建实现原理详解

 更新时间:2022年12月15日 08:50:41   作者:小郭的技术笔记  
这篇文章主要为大家介绍了RocketMQ线程池创建实现原理详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

大家好,我是小郭,今天主要来和大家聊一聊RocketMQ中的线程池是如何创建的,如何设置线程池数量,同时也可以从中去学习到一些线程池的实践和需要注意的一些细节。

RocketMQ在哪些地方使用到了线程池?

在RocketMQ中存在了大量的对线程池的使用,从消息的生产到投递Broker中,到最后的消息消费每一个环节中都大量使用到线程池的地方,下面我们拿出几个不同类型的线程池来看一看。

在 NameServer的路由注册和剔除中,多次使用到了定时线程池

定时线程池

private final ScheduledExecutorService scheduledExecutorService =
	Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
		"NSScheduledThread"));
// 定时任务 每10s扫描一次Broker,移除失活Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	​
	@Override
	public void run() {
		NamesrvController.this.routeInfoManager.scanNotActiveBroker();
	}
}, 5, 10, TimeUnit.SECONDS);
//定时任务,每隔30s向集群中所有NameServer发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	​
	@Override
	public void run() {
		try {
			BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
		} catch (Throwable e) {
			log.error("registerBrokerAll Exception", e);
		}
	}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

线程池newFixedThreadPool

FixedThreadPool常用于创建一个固定大小的线程池,

它的特点就是核心线程数量与最大线程数量一致,采用无界的阻塞队列 LinkedBlockingQueue,并且没有设置队列的大小默认是Integer.MAX_VALUE,适用于负载较重的场景

private ExecutorService remotingExecutor;
this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 用来设置接收到消息后的处理方法
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);

消息发送初始化默认异步发送者线程池

核心线程数与最大线程数设置均为 Runtime.getRuntime().availableProcessors() ,可用的计算资源

阻塞队列设置为一个初始化50000长度的阻塞队列

keepAliveTime设置60s,超过则时间空闲的线程将被终止

private final ExecutorService defaultAsyncSenderExecutor;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
	Runtime.getRuntime().availableProcessors(),
	Runtime.getRuntime().availableProcessors(),
	1000 * 60,
	TimeUnit.MILLISECONDS,
	this.asyncSenderThreadPoolQueue,
	new ThreadFactory() {
		private AtomicInteger threadIndex = new AtomicInteger(0);
		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
		}
	});

消费端拉取消息线程池

我们重点来看一下消费端的线程池是如何创建,它可以说是整个RocketMQ中最关键的一个线程池

为了提高消费速度,我们通常有两种方式来提高消费并行度

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度
  • 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

如何创建?

在消息监听的时候,利用线程池进行不断的拉取消息

提交消费请求,消息提交到内部的线程池

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

参数设置

创建内部线程池,核心参数核心线程数和最大线程数,主要是根据配置来进行设置

设置线程池名称以 ConsumeMessageThread_ 开头的,利于排查问题

阻塞队列是一个无界的阻塞队列LinkedBlockingQueue

private final BlockingQueue<Runnable> consumeRequestQueue;
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl(consumeThreadPrefix));

通过RocketMQ的源码,我们看到 consumeExecutor 线程池的创建也是非常简单的

如果想要修改线程池参数,需要注意什么?

根据线程池的原理我们知道,只有阻塞队列为满的情况下,不会创建临时线程

所以线程池内部持有的队列为一个无界队列,导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量

什么时候需要修改?

在正常的业务场景中,启动应用之后,我们就不会再修改消费者线程数,但有可能突发业务高峰导致消息堆积,这时候我们就需要调整单个 Consumer 的消费并行线程数。

如何修改线程数?

  • 修改线程池后,重新启动消费者,缺点是参数不易评估,随着业务的并发提升,需要频繁的重启服务来更改线程数,这势必会带来一定的造成影响。
  • 官方也为我们提供了修改线程数的方法,当更新的线程数大于0且小于 Short.MAX_VALUE 且小于最大线程数,则更新核心线程数。

JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略

@Override
public void updateCorePoolSize(int corePoolSize) {
    if (corePoolSize > 0
        && corePoolSize <= Short.MAX_VALUE
        && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
        this.consumeExecutor.setCorePoolSize(corePoolSize);
    }
}

这两种方式都存在一定的痛点

  • 线程数量随着业务的变动,需要修改代码
  • 在springBoot和SpringCloud Stream下,对线程池参数变更不是很友好
  • 不能通过管理界面,直接动态修改线程池参数

针对上面的痛点问题,我们可以考虑封装线程池动态参数调整,首先肯定原来代码是毫无侵入性的,

同时通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。

以上就是RocketMQ线程池创建实现原理详解的详细内容,更多关于RocketMQ线程池创建的资料请关注脚本之家其它相关文章!

相关文章

  • Java实现最小高度树

    Java实现最小高度树

    本文主要介绍了Java实现最小高度树,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-04-04
  • java方法重写时需要注意的问题

    java方法重写时需要注意的问题

    大家好,本篇文章主要讲的是java方法重写时需要注意的问题,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览
    2021-12-12
  • SpringMVC HttpMessageConverter报文信息转换器

    SpringMVC HttpMessageConverter报文信息转换器

    这篇文章主要为大家介绍了SpringMVC HttpMessageConverter报文信息转换器,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • 如何使用idea里面自带的翻译插件

    如何使用idea里面自带的翻译插件

    这篇文章主要介绍了idea里面自带的翻译插件,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • SpringSecurityOAuth2实现微信授权登录

    SpringSecurityOAuth2实现微信授权登录

    微信的登录功能是用户注册和使用微信的必经之路之一,而微信授权登录更是方便了用户的登录操作,本文主要介绍了SpringSecurityOAuth2实现微信授权登录,感兴趣的可以了解一下
    2023-09-09
  • Java forEach对原数组的操作过程

    Java forEach对原数组的操作过程

    forEach对于基本数据类型,是直接赋值,对于引用数据类型,是引用地址值,forEach遍历时,是创建的临时变量,引用的数据地址,本文给大家介绍Java forEach对原数组的操作过程,感兴趣的朋友一起看看吧
    2024-02-02
  • java实现的顺时针/逆时针打印矩阵操作示例

    java实现的顺时针/逆时针打印矩阵操作示例

    这篇文章主要介绍了java实现的顺时针/逆时针打印矩阵操作,涉及java基于数组的矩阵存储、遍历、打印输出等相关操作技巧,需要的朋友可以参考下
    2019-12-12
  • @CacheEvict中的allEntries与beforeInvocation的区别说明

    @CacheEvict中的allEntries与beforeInvocation的区别说明

    这篇文章主要介绍了@CacheEvict中的allEntries与beforeInvocation的区别说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • java基础之初始化ArrayList时直接赋值的4种方式总结

    java基础之初始化ArrayList时直接赋值的4种方式总结

    ArrayList是Java中的一个类,它是Java集合框架中的一部分,用于实现动态数组,下面这篇文章主要给大家介绍了关于java基础之初始化ArrayList时直接赋值的4种方式,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-07-07
  • 解决java中的父类私有成员变量的继承问题

    解决java中的父类私有成员变量的继承问题

    这篇文章主要介绍了解决java中的父类私有成员变量的继承问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01

最新评论