ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解

 更新时间:2023年10月11日 10:25:51   作者:项哥  
这篇文章主要介绍了ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解,Rocketmq 消费者在高峰期希望手动减少消费线程数,通过DefaultMQPushConsumer.updateCorePoolSize方法可以调用内部的setCorePoolSize设置多线程核心线程数,需要的朋友可以参考下

背景

Rocketmq 消费者在高峰期希望手动减少消费线程数,通过DefaultMQPushConsumer.updateCorePoolSize方法可以调用内部的ThreadPoolExecutor.setCorePoolSize设置多线程核心线程数。

那么是否能够通过调整参数动态调整Rocketmq消费者呢。

结论

  • 多线程ThreadPoolExecutor.setCorePoolSize可以修改核心线程数,但是减少核心线程数不一定生效
  • 核心线程销毁的前提是至少在keepAliveTime内没有新的任务提交

动态调整消费线程实现方案

  • 可以通过调整核心线程数减少RocketMQ 消费线程数
    • 先挂起消费者consumer.suspend()
    • 调用consumer.updateCorePoolSize更新核心线程数
    • 然后休眠至少1分钟以上,等任务全部消费完成,1分钟是基于ConsumeMessageConcurrentlyService中创建线程池默认参数1000*60 TimeUnit.MILLISECONDS得到的, 还需要加上本地队列堆积任务消费完成时间
    • 恢复消费者consumer.resume()
consumer.suspend();
consumer.updateCorePoolSize(3);
try {
	TimeUnit.SECONDS.sleep(65000L);
 } catch (Exception e) {
	log.error("InterruptException", e);
}
consumer.resume();
  • 增加消费线程数,直接通过consumer.updateCorePoolSize方法就可以实现

测试

ThreadTest.java

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadTest {
   public static void main(String[] args) throws InterruptedException {
       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
               10,
               50,
               1000 * 60,
               TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<>(),
               new ThreadFactoryImpl("test" + "_" + "ConsumeMessageThread_"));
       for (int i = 0; i < 1000; i++) {
           threadPoolExecutor.submit(new Runnable() {
               @SneakyThrows
               @Override
               public void run() {
                   Thread.sleep(5);
                   log.info("hello");
               }
           });
       }
       log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
       Thread.sleep(10000L);
       threadPoolExecutor.setCorePoolSize(3);
       log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
       // Thread.sleep(1000*60); // 如果休眠时间大于KeepAliveTime将会只有3个线程
       Thread.sleep(1000L);  // 休眠时间不够时仍然有10个线程
       for (int i = 0; i < 1000; i++) {
           threadPoolExecutor.submit(new Runnable() {
               @SneakyThrows
               @Override
               public void run() {
                   Thread.sleep(10);
                   log.info("hello2");
               }
           });
       }
   }
}

实验证明setCorePoolSize在设置为3个线程以后,在第二批任务提交还是有10个线程在工作, 但是如果在第二批任务提交前休眠时间大于keepAliveTime以后则只会有3个工作线程

原理

源码部分主要看是ThreadPoolExecutor中的workers变量,setCorePoolSize()方法,runWorker()方法,getTask()方法

  • 一个work在执行runWorker()方法时只有在获取任务getTask()方法返回null以后才会终止循环,然后销毁
  • getTask()方法从任务队列中拿取任务等待keepAliveTime超时以后才会有可能返回null
    // 工作workers, work只有在获取任务超时以后才会从workers中删除
    private final HashSet<Worker> workers = new HashSet<Worker>();
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
        // 减少核心线程数以后进入interruptIdleWorkers方法
            interruptIdleWorkers();
        else if (delta > 0) {
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
     private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 在interruptIdleWorkers方法中只是将work的线程中断,并没有从workers删除
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 重点是getTask()方法获取task失败才会中断循环
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 超时以后进入这里的if返回null然后work才会被销毁
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
   private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 这里才真正将worker删除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

到此这篇关于ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解的文章就介绍到这了,更多相关RocketMQ消费线程调整内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java将字符串转化为数组的两种方法

    Java将字符串转化为数组的两种方法

    Java中的String类是一种特殊的字符串,它可以被用于处理字符串,Java中的String类也可以将字符串转换为数组,下面这篇文章主要给大家介绍了关于Java将字符串转化为数组的两种方法,需要的朋友可以参考下
    2023-05-05
  • SpringCache的简介和使用教程

    SpringCache的简介和使用教程

    缓存是实际工作中经常使用的一种提高性能的方法, 我们会在很多场景下来使用缓存,而spring-cache就是一种简单的实现。通过本文学习可以了解SpringCache的简介和使用方法,感兴趣的朋友一起看看吧
    2021-11-11
  • SpringBoot实现动态多线程并发定时任务

    SpringBoot实现动态多线程并发定时任务

    这篇文章主要为大家详细介绍了SpringBoot实现动态多线程并发定时任务,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-05-05
  • Java JVM程序指令码实例解析

    Java JVM程序指令码实例解析

    这篇文章主要介绍了Java JVM程序指令码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01
  • Java实现发送短信验证码功能

    Java实现发送短信验证码功能

    这篇文章主要为大家详细介绍了Java实现发送短信验证码功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-11-11
  • IDEA快速显示Run DashBoard的图文详解

    IDEA快速显示Run DashBoard的图文详解

    这篇文章主要介绍了IDEA快速显示Run DashBoard的图文详解,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • 详解Java如何判断一个对象是否为空

    详解Java如何判断一个对象是否为空

    我们在刚开始学习Java的时候,遇到过最多的异常肯定是臭名昭著的空指针异常(NullPointerException),可以说它陪伴了我们整个初学阶段,那么如何优雅的判断一个对象是否为空并且减少空指针异常呢,
    2024-01-01
  • J2EE中的struts2表单细节处理

    J2EE中的struts2表单细节处理

    这篇文章主要介绍了J2EE中的struts2表单细节处理的相关资料,需要的朋友可以参考下
    2017-06-06
  • java通过HttpServletRequest获取post请求中的body内容的方法

    java通过HttpServletRequest获取post请求中的body内容的方法

    本篇文章主要介绍了java通过HttpServletRequest获取post请求中的body内容的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-02-02
  • Springboot整合mybatis开启二级缓存的实现示例

    Springboot整合mybatis开启二级缓存的实现示例

    在一级缓存中,是查询两次数据库的,显然这是一种浪费,既然SQL查询相同,就没有必要再次查库了,直接利用缓存数据即可,这种思想就是MyBatis二级缓存的初衷,本文就详细的介绍了Springboot整合mybatis开启二级缓存,感兴趣的可以了解一下
    2022-05-05

最新评论