ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解
背景
Rocketmq 消费者在高峰期希望手动减少消费线程数,通过DefaultMQPushConsumer.updateCorePoolSize方法可以调用内部的ThreadPoolExecutor.setCorePoolSize设置多线程核心线程数。
那么是否能够通过调整参数动态调整Rocketmq消费者呢。
结论
- 多线程ThreadPoolExecutor.setCorePoolSize可以修改核心线程数,但是减少核心线程数不一定生效
- 核心线程销毁的前提是至少在keepAliveTime内没有新的任务提交
动态调整消费线程实现方案
- 可以通过调整核心线程数减少RocketMQ 消费线程数
- 先挂起消费者consumer.suspend()
- 调用consumer.updateCorePoolSize更新核心线程数
- 然后休眠至少1分钟以上,等任务全部消费完成,1分钟是基于ConsumeMessageConcurrentlyService中创建线程池默认参数1000*60 TimeUnit.MILLISECONDS得到的, 还需要加上本地队列堆积任务消费完成时间
- 恢复消费者consumer.resume()
consumer.suspend(); consumer.updateCorePoolSize(3); try { TimeUnit.SECONDS.sleep(65000L); } catch (Exception e) { log.error("InterruptException", e); } consumer.resume();
- 增加消费线程数,直接通过consumer.updateCorePoolSize方法就可以实现
测试
ThreadTest.java
import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.ThreadFactoryImpl; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j public class ThreadTest { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, 50, 1000 * 60, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("test" + "_" + "ConsumeMessageThread_")); for (int i = 0; i < 1000; i++) { threadPoolExecutor.submit(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(5); log.info("hello"); } }); } log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize()); Thread.sleep(10000L); threadPoolExecutor.setCorePoolSize(3); log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize()); // Thread.sleep(1000*60); // 如果休眠时间大于KeepAliveTime将会只有3个线程 Thread.sleep(1000L); // 休眠时间不够时仍然有10个线程 for (int i = 0; i < 1000; i++) { threadPoolExecutor.submit(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(10); log.info("hello2"); } }); } } }
实验证明setCorePoolSize在设置为3个线程以后,在第二批任务提交还是有10个线程在工作, 但是如果在第二批任务提交前休眠时间大于keepAliveTime以后则只会有3个工作线程
原理
源码部分主要看是ThreadPoolExecutor中的workers变量,setCorePoolSize()方法,runWorker()方法,getTask()方法
- 一个work在执行runWorker()方法时只有在获取任务getTask()方法返回null以后才会终止循环,然后销毁
- getTask()方法从任务队列中拿取任务等待keepAliveTime超时以后才会有可能返回null
// 工作workers, work只有在获取任务超时以后才会从workers中删除 private final HashSet<Worker> workers = new HashSet<Worker>(); public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) // 减少核心线程数以后进入interruptIdleWorkers方法 interruptIdleWorkers(); else if (delta > 0) { int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { // 在interruptIdleWorkers方法中只是将work的线程中断,并没有从workers删除 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 重点是getTask()方法获取task失败才会中断循环 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 超时以后进入这里的if返回null然后work才会被销毁 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 这里才真正将worker删除 workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
到此这篇关于ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解的文章就介绍到这了,更多相关RocketMQ消费线程调整内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
java通过HttpServletRequest获取post请求中的body内容的方法
本篇文章主要介绍了java通过HttpServletRequest获取post请求中的body内容的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下2018-02-02Springboot整合mybatis开启二级缓存的实现示例
在一级缓存中,是查询两次数据库的,显然这是一种浪费,既然SQL查询相同,就没有必要再次查库了,直接利用缓存数据即可,这种思想就是MyBatis二级缓存的初衷,本文就详细的介绍了Springboot整合mybatis开启二级缓存,感兴趣的可以了解一下2022-05-05
最新评论