一文搞懂Java的ThreadPoolExecutor原理
一. Executor框架简介
Executor框架提供了组件来管理Java中的线程,Executor框架将其分为任务,线程执行任务,任务执行结果三部分。下面以表格形式对这三部分进行说明。
项 | 说明 |
---|---|
任务 | Executor框架提供了Runnable接口和Callable接口,任务需要实现这两个接口才能被线程执行 |
线程执行任务 | Executor框架提供了接口Executor和继承于Executor的ExecutorService接口来定义任务执行机制。Executor框架中的线程池类ThreadPoolExecutor和ScheduledThreadPoolExecutor均实现了ExecutorService接口 |
任务执行结果 | Executor框架提供了Future接口和实现了Future接口的FutureTask类来定义任务执行结果。 |
组件之间的类图关系如下所示。
Executor接口是线程池的顶层接口,通常说到的线程池指的是ThreadPoolExecutor,同时ThreadPoolExecutor还有一个子类叫做ScheduledThreadPoolExecutor,其扩展实现了延时执行任务和定时执行任务的功能。
Executor框架指的是任务,执行任务的线程池和任务执行结果这三部分,切不可将Executor框架与Executor接口相混淆。
本篇文章就将对Executor框架中的ThreadPoolExecutor的源码实现进行学习。
二. 认识ThreadPoolExecutor状态
在学习ThreadPoolExecutor如何执行任务前,先认识一下ThreadPoolExecutor的状态。
ThreadPoolExecutor继承于AbstractExecutorService,并实现了ExecutorService接口,是Executor框架的核心类,用于管理线程。
ThreadPoolExecutor使用了原子整型ctl来表示线程池状态和Worker数量。ctl是一个原子整型,前3位表示线程池状态,后29位表示Worker数量。ThreadPoolExecutor中这部分的源码如下所示。
public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 取整型前3位,即获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 取整型后29位,即获取Worker数量 private static int workerCountOf(int c) { return c & CAPACITY; } // 根据线程池状态和Worker数量拼装ctl private static int ctlOf(int rs, int wc) { return rs | wc; } // 线程池状态判断 private static boolean runStateLessThan(int c, int s) { return c < s; } // 线程池状态判断 private static boolean runStateAtLeast(int c, int s) { return c >= s; } // 判断线程池状态是否为RUNNING private static boolean isRunning(int c) { return c < SHUTDOWN; } ...... }
可知ThreadPoolExecutor有如下五种线程池状态。
- RUNNING,线程池接受新任务,会执行任务阻塞队列中的任务,ctl前三位表示为111;
- SHUTDOWN,线程池拒绝新任务,会执行任务阻塞队列中的任务,ctl前三位表示为000;
- STOP,线程池拒绝新任务,不会执行任务阻塞队列中的任务,尝试中断正在执行的任务,ctl前三位表示为001;
- TIDYING,所有任务被关闭,Worker数量为0,ctl前三位表示为010;
- TERMINATED,terminated() 执行完毕,ctl前三位表示为011。
得益于ctl的结构,所以无论Worker数量是多少,ThreadPoolExecutor中线程池状态存在如下关系。
RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
因此runStateLessThan(),runStateAtLeast() 和isRunning() 方法可以方便的对线程池状态进行判断。
三. 执行任务源码分析
作为线程池,ThreadPoolExecutor最重要也最经典的地方,当然就是执行任务了。本节对ThreadPoolExecutor执行任务的流程进行一个学习。
ThreadPoolExecutor中执行任务的入口方法为execute(),其实现如下。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果Worker数量小于核心线程数,则创建Worker并执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果Worker数量大于等于核心线程数,则将任务添加到任务阻塞队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池状态突然不再是RUNNING,则尝试将任务从任务阻塞队列中删除,删除成功则为该任务执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果线程池中Worker数量突然为0,则创建一个Worker来执行任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 执行到这里表示线程池状态已经不是RUNNING或者任务阻塞队列已满 // 此时尝试新建一个Worker来执行任务 // 如果新建一个Worker来执行任务失败,表明线程池状态不再是RUNNING或者Worker数量已经达到最大线程数,此时执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
在execute() 中会根据Worker数量和线程池状态来决定是新建Worker来执行任务还是将任务添加到任务阻塞队列。新建Worker来执行任务的实现如下所示。
private boolean addWorker(Runnable firstTask, boolean core) { // 标记外层for循环 retry: for (;;) { int c = ctl.get(); // 获取线程池状态 int rs = runStateOf(c); // 线程池状态为RUNNING时,可以创建Worker // 线程池状态为SHUTDOWN,且任务阻塞队列不为空时,可以创建初始任务为null的Worker if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取Worker数量 int wc = workerCountOf(c); // 如果Worker数量大于CAPACITY,拒绝创建Worker // core为true表示创建核心线程Worker,如果Worker数量此时已经大于等于核心线程数,则拒绝创建Worker,转而应该将任务添加到任务阻塞队列 // core为false表示创建非核心线程Worker,如果Worker数量此时已经大于等于最大线程数,则拒绝创建Worker,转而应该执行拒绝策略 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 以CAS方式将Worker数量加1 // 加1成功表明无竞争发生,从外层for循环跳出 if (compareAndIncrementWorkerCount(c)) break retry; // 加1失败表明有竞争发生,此时需要重新获取ctl的值 c = ctl.get(); // 重新获取ctl后如果发现线程池状态发生了改变,此时重新执行外层for循环,即需要基于新的线程池状态判断是否允许创建Worker // 重新获取ctl后如果线程池状态未发生改变,则继续执行内层for循环,即尝试再一次以CAS方式将Worker数量加1 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建一个Worker w = new Worker(firstTask); // 获取Worker的线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 由于线程池中存储Worker的集合为HashSet,因此将Worker添加到Worker集合时需要获取全局锁保证线程安全 mainLock.lock(); try { // 再一次获取线程池状态 int rs = runStateOf(ctl.get()); // 如果线程池状态还是为RUNNING或者线程池状态为SHUTDOWN但创建的Worker的初始任务为null,则允许将创建出来的Worker添加到集合 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 检查一下Worker的线程是否可以启动(处于活动状态的线程无法再启动) if (t.isAlive()) throw new IllegalThreadStateException(); // 将Worker添加到Worker集合 workers.add(w); int s = workers.size(); // largestPoolSize用于记录线程池最多存在过的Worker数 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动Worker线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // Worker线程没有成功启动起来,此时需要对该Worker的创建执行回滚操作 addWorkerFailed(w); } return workerStarted; }
addWorker() 方法中只允许两种情况可以创建Worker。
- 线程池状态为RUNNING,可以创建Worker;
- 线程池状态为SHUTDOWN,且任务阻塞队列不为空,可以创建初始任务为null的Worker。
一旦Worker创建成功,就会将Worker的线程启动,如果Worker创建失败或者Worker的线程启动失败,则会调用addWorkerFailed() 方法执行回滚操作,其实现如下所示。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 如果Worker添加到了Worker集合中,则将Worker从Worker集合中删除 if (w != null) workers.remove(w); // 以CAS方式将Worker数量减1 decrementWorkerCount(); // 尝试终止线程池 tryTerminate(); } finally { mainLock.unlock(); } }
由于Worker自身实现了Runnable,因此Worker自身就是一个任务,实际上Worker的线程执行的任务就是Worker本身,因此addWorker() 中将Worker的线程启动时,会调用Worker的run() 方法,其实现如下。
public void run() { runWorker(this); }
在Worker的run() 方法中调用了ThreadPoolExecutor的runWorker() 方法,其实现如下所示。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 如果task为null,则从任务阻塞队列中获取任务 // 通常Worker启动时会先执行初始任务,然后再去任务阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // 线程池正在停止时,需要确保当前Worker的线程是被中断的 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // Worker执行任务发生异常或者从getTask()中获取任务为空时会执行这里的逻辑 // processWorkerExit()会将Worker从Worker集合中删除,并尝试终止线程池 processWorkerExit(w, completedAbruptly); } }
runWorker() 方法就是先让Worker将初始任务(如果有的话)执行完,然后循环从任务阻塞队列中获取任务来执行,如果Worker执行任务发生异常或者从任务阻塞队列获取任务失败(获取到的任务为null),则调用processWorkerExit() 方法来将自身从Worker集合中删除。下面先看一下getTask() 方法的实现。
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果线程池状态为SHUTDOWN,且任务阻塞队列为空,则不再允许从任务阻塞队列中获取任务 // 如果线程池状态为STOP,则不再允许从任务阻塞队列中获取任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 如果allowCoreThreadTimeOut为true,或者当前线程数大于核心线程数,此时timed为true,表明从任务阻塞队列以超时退出的方式获取任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果当前线程数大于最大线程数,则当前Worker应该被删除 // 如果当前Worker上一次从任务阻塞队列中获取任务时超时退出,且任务阻塞队列现在还是为空,则当前Worker应该被删除 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 从任务阻塞队列中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 获取到任务则返回该任务 return r; // timedOut为true表明Worker上一次从任务阻塞队列中获取任务时超时退出 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
getTask() 方法在如下情况不允许Worker从任务阻塞队列中获取任务。
- 线程池状态为SHUTDOWN,且任务阻塞队列为空;
- 线程池状态为STOP。
如果Worker有资格从任务阻塞队列获取任务,那么当allowCoreThreadTimeOut为true,或者当前线程数大于核心线程数时,Worker以超时退出的方式获取任务,否则Worker以一直阻塞的方式获取任务。
当Worker在getTask() 方法中获取任务失败时,getTask() 方法会返回null,从而导致Worker会执行processWorkerExit() 方法来删除自身,其实现如下所示。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly为true表明是执行任务时发生异常导致Worker需要被删除 if (completedAbruptly) // 修正Worker数量 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 将Worker从Worker集合中删除 workers.remove(w); } finally { mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
Worker在processWorkerExit() 方法中删除自身之后,还会调用tryTerminate() 尝试终止线程池,tryTerminate() 方法很精髓,后面会对其进行详细分析,这里暂且不谈。至此,Worker的创建,执行任务,获取任务和删除的整个流程已经大体分析完毕。
对于执行任务,现在简单进行一个小结。
ThreadPoolExecutor执行任务,第一步是根据Worker数量来决定是新建Worker来执行任务还是将任务添加到任务阻塞队列,这里的判断规则如下。
- 如果Worker数量小于核心线程数,则创建Worker来执行任务;
- 如果Worker数量大于等于核心线程数,则将任务添加到任务阻塞队列;
- 如果任务阻塞队列已满,则创建Worker来执行任务;
- 如果Worker数量已经达到最大线程数,此时执行任务拒绝策略。
当要新建Worker来执行任务时,只有两种情况可以新建Worker,如下所示。
- 线程池状态为RUNNING,可以创建Worker;
- 线程池状态为SHUTDOWN,且任务阻塞队列不为空,可以创建初始任务为null的Worker。
Worker自身实现了Runnable,且Worker持有一个线程,当Worker启动时,就是启动Worker持有的线程,而这个线程执行的任务就是Worker自身。
Worker启动后,会首先执行自己的初始任务,然后再去任务阻塞队列中获取任务。
四. 关闭线程池源码分析
不再使用的线程池,可以进行关闭。关闭ThreadPoolExecutor的方法有shutdown() 和shutdownNow(),本节将对ThreadPoolExecutor的关闭进行分析。
1. shutdown()
首先分析shutdown() 方法,其实现如下。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 循环通过CAS方式将线程池状态置为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲Worker interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); }
在shutdown() 方法中首先会将线程池状态置为SHUTDOWN,然后调用interruptIdleWorkers() 方法中断空闲Worker,最后调用tryTerminate() 方法来尝试终止线程池。那么这里要解释一下什么是空闲Worker,先看一下interruptIdleWorkers() 的实现。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 中断线程前需要先尝试获取Worker的锁 // 只能获取到空闲Worker的锁,所以shutdown()方法只会中断空闲Worker if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
调用interruptIdleWorkers() 方法中断Worker前首先需要尝试获取Worker的锁,已知Worker除了实现Runnable接口外,还继承于AbstractQueuedSynchronizer,因此Worker本身是一把锁,然后在runWorker() 中Worker执行任务前都会先获取Worker的锁,这里看一下Worker的lock() 方法的实现。
public void lock() { acquire(1); } protected boolean tryAcquire(int unused) { // 以CAS方式将state从0设置为1 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
可以发现,Worker在lock() 中调用了acquire() 方法,该方法由AbstractQueuedSynchronizer抽象类提供,在acquire() 中会调用其子类实现的tryAcquire() 方法,tryAcquire() 方法会以CAS方式将state从0设置为1,因此这样的设计让Worker是一把不可重入锁。
回到interruptIdleWorkers() 方法,前面提到该方法中断Worker前会尝试获取Worker的锁,能够获取到锁才会中断Worker,而因为Worker是不可重入锁,所以正在执行任务的Worker是无法获取到锁的,只有那些没有执行任务的Worker的锁才能够被获取,因此所谓的中断空闲Worker,实际就是中断没有执行任务的Worker,那些执行任务的Worker在shutdown() 方法被调用时不会被中断,这些Worker执行完任务后会继续从任务阻塞队列中获取任务来执行,直到任务阻塞队列为空,此时没有被中断过的Worker也会被删除掉,等到线程池中没有Worker以及任务阻塞队列没有任务后,线程池才会被终止掉。
对于shutdown() 方法,一句话总结就是:将线程池状态置为SHUTDOWN并拒绝接受新任务,等到线程池Worker数量为0,任务阻塞队列为空时,关闭线程池。
2. shutdownNow()
现在再来分析shutdownNow() 方法。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 循环通过CAS方式将线程池状态置为STOP advanceRunState(STOP); // 中断所有Worker interruptWorkers(); // 将任务阻塞队列中的任务获取出来并返回 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 中断线程池中所有Worker for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
在shutdownNow() 方法中首先会将线程池状态置为STOP,然后调用interruptWorkers() 方法中断线程池中的所有Worker,接着调用tryTerminate() 方法来尝试终止线程池,最后shutdownNow() 方法会将任务阻塞队列中还未被执行的任务返回。
shutdownNow() 方法调用之后,线程池中的所有Worker都会被中断,包括正在执行任务的Worker,等到所有Worker都被删除之后,线程池即被终止,也就是说,shutdownNow() 不会保证当前时刻正在执行的任务会被安全的执行完,并且会放弃执行任务阻塞队列中的所有任务。
3. tryTerminate()
关于线程池的关闭,还有一个重要的方法,那就是前面多次提到的tryTerminate() 方法,该方法能确保线程池可以被正确的关闭,其实现如下所示。
final void tryTerminate() { for (;;) { int c = ctl.get(); // 如果线程池状态为RUNNING,则没有资格终止线程池 // 如果线程池状态大于等于TIDYING,则没有资格终止线程池 // 如果线程池状态为SHUTDOWN但任务阻塞队列不为空,则没有资格终止线程池 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 线程池状态为SHUTDOWN且任务阻塞队列为空会执行到这里 // 线程池状态为STOP会执行到这里 // Worker数量不为0,表明当前还有正在执行任务的Worker或者空闲的Worker,此时中断一个空闲的Worker // 在这里被中断的空闲Worker会在getTask()方法中返回null,从而执行processWorkerExit(),最终该Worker会被删除 // processWorkerExit()方法中又会调用tryTerminate(),因此将shutdown信号在空闲Worker之间进行了传播 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将线程池状态置为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 终止线程池 terminated(); } finally { // 将线程池状态最终置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
在tryTerminate() 方法的官方注释中给出了两种线程池会被终止的情况:
- 线程池的状态为SHUTDOWN,Worker数量为0,任务阻塞队列为空;
- 线程池的状态为STOP,Worker数量为0。
官方注释中还说明在所有可能导致线程池终止的操作中都应该调用tryTerminate() 方法来尝试终止线程池,因此线程池中Worker被删除时和任务阻塞队列中任务被删除时会调用tryTerminate(),以达到在线程池符合终止条件时及时终止线程池。
4. 小结
对于关闭线程池,简单小结如下。
关闭ThreadPoolExecutor有两种方式,如下所示。
- shutdown()。调用shutdown() 方法会首先将线程池状态置为SHUTDOWN并拒绝接受新任务,然后中断空闲Worker,等到线程池中Worker数量为0,任务阻塞队列为空时,线程池被真正关闭;
- shutdownNow()。调用shutdownNow() 方法会首先将线程池状态置为STOP,然后中断所有Worker(包括正在执行任务的Worker),并将任务阻塞队列中还未被执行的任务返回,当线程池Worker数量为0时,线程池被真正关闭。
还有一点需要说明,Worker除了实现Runnable接口外,还继承于AbstractQueuedSynchronizer,因此Worker本身是一把锁,Worker执行任务前都会先获取Worker的锁,所以正在执行任务的Worker的锁是无法被获取的,换言之,只有没有执行任务的Worker的锁才能被获取,这些Worker就称为空闲Worker。
以上就是一文搞懂Java的ThreadPoolExecutor原理的详细内容,更多关于Java ThreadPoolExecutor的资料请关注脚本之家其它相关文章!
相关文章
spring 中事务注解@Transactional与trycatch的使用
这篇文章主要介绍了spring 中事务注解@Transactional与trycatch的使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2021-06-06spring cloud consul注册的服务报错critical的解决
这篇文章主要介绍了spring cloud consul注册的服务报错critical的解决,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧2019-03-03springboot集成opencv实现人脸识别功能的详细步骤
大家都知道OpenCV是一个基于BSD许可(开源)发行的跨平台计算机视觉和机器学习软件库,可以运行在Linux、Windows、Android和Mac OS操作系统上今天通过本文给大家分享springboot集成opencv实现人脸识别,感兴趣的朋友一起看看吧2021-06-06dm.jdbc.driver.DMException网络通信异常的解决过程
最近一个项目里面出现了一个比较诡异的问题,给大家分享下,这篇文章主要给大家介绍了关于dm.jdbc.driver.DMException网络通信异常的解决过程,需要的朋友可以参考下2023-02-02
最新评论