详解如何自定义parallelStream线程池
ForkJoinPool
本文主要研究一下parallelStream怎么使用自定义的线程池
java/util/concurrent/ForkJoinPool.java
public class ForkJoinPool extends AbstractExecutorService { public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = new DefaultCommonPoolForkJoinWorkerThreadFactory(); else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); } }
parallelStream默认使用的是common的ForkJoinPool,可以通过系统属性来设置parallelism等
ForkJoinPoolFactoryBean
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean { private boolean commonPool = false; private int parallelism = Runtime.getRuntime().availableProcessors(); private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory; @Nullable private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; private boolean asyncMode = false; private int awaitTerminationSeconds = 0; @Nullable private ForkJoinPool forkJoinPool; //...... @Override public void destroy() { if (this.forkJoinPool != null) { // Ignored for the common pool. this.forkJoinPool.shutdown(); // Wait for all tasks to terminate - works for the common pool as well. if (this.awaitTerminationSeconds > 0) { try { this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } } }
spring3.1提供了ForkJoinPoolFactoryBean,可以用于创建并托管forkJoinPool
示例
配置
@Configuration public class ForkJoinConfig { @Bean public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() { ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean(); factoryBean.setCommonPool(false); // NOTE LIFO_QUEUE FOR working steal from tail of queue factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE factoryBean.setParallelism(10); // factoryBean.setUncaughtExceptionHandler(); factoryBean.setAwaitTerminationSeconds(60); return factoryBean; } }
使用
@Autowired ForkJoinPoolFactoryBean forkJoinPoolFactoryBean; public void streamParallel() throws ExecutionException, InterruptedException { List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() { @Override public List<TodoTask> call() throws Exception { return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> { log.info("thread:{}", Thread.currentThread().getName()); return new TodoTask(i, "name"+i); }).collect(Collectors.toList()); } }).get(); result.stream().forEach(System.out::println); }
common的workerName前缀为ForkJoinPool.commonPool-worker-
自定义的workerName前缀默认为ForkJoinPool- nextPoolId() -worker-
小结
parallelStream默认使用的是commonPool,是static代码块默认初始化,针对个别场景可以自定义ForkJoinPool,将parallelStream作为一个任务丢进去,这样子就不会影响默认的commonPool。
以上就是如何自定义parallelStream的线程池的详细内容,更多关于如何自定义parallelStream的线程池的资料请关注脚本之家其它相关文章!
相关文章
Mybatis一级缓存和结合Spring Framework后失效的源码探究
这篇文章主要介绍了Mybatis一级缓存和结合Spring Framework后失效的源码探究,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2021-04-04
最新评论