Java中的CompletionService批量异步执行详解
前景引入
我们知道线程池可以执行异步任务,同时可以通过返回值Future获取返回值,所以异步任务大多数采用ThreadPoolExecutor+Future,如果存在如下情况,需要从任务一二三中获取返回值后,保存到数据库中,用异步逻辑实现代码应该如下所示。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); Future<Integer> f1 = executorService.submit(() -> { System.out.println("执行任务一"); return 1; }); Future<Integer> f2 = executorService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = executorService.submit(() -> { System.out.println("执行任务三"); return 3; }); Integer r1 = f1.get(); executorService.execute(()->{ // 省略保存r1操作 System.out.println(r1); }); Integer r2 = f2.get(); executorService.execute(()->{ // 省略保存r2操作 System.out.println(r2); }); Integer r3 = f3.get(); executorService.execute(()->{ // 省略保存r3操作 System.out.println(r3); }); executorService.shutdown(); }
这样写的代码一点毛病没有,逻辑都是正常的,但如果存在任务一查询了比较耗时的操作,由于f1.get是阻塞执行,那么就算任务二和任务三已经返回结果,任务二的返回值和任务三的返回值都是不能保存到数据库的,因为f1.get将主线程阻塞了。
批量异步实现
那可以如何处理呢?可以采用万能的阻塞队列,任务先执行完毕的先入队,这样可以保证其它线程入库的速度不受影响,提高效率。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3); Future<Integer> f1 = executorService.submit(() -> { System.out.println("执行任务一"); Thread.sleep(5000); return 1; }); Future<Integer> f2 = executorService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = executorService.submit(() -> { System.out.println("执行任务三"); Thread.sleep(3000); return 3; }); executorService.execute(()->{ try { Integer r1 = f1.get(); // 阻塞队列入队操作 queue.put(r1); System.out.println(r1); } catch (Exception e) { e.printStackTrace(); } }); executorService.execute(()->{ try { Integer r2 = f2.get(); queue.put(r2); System.out.println(r2); } catch (Exception e) { e.printStackTrace(); } }); executorService.execute(()->{ try { Integer r3 = f3.get(); queue.put(r3); System.out.println(r3); } catch (Exception e) { e.printStackTrace(); } }); // 循环次数不要使用queue.size限制,因为不同时刻queue.size值是有可能不同的 for (int i = 0; i <3; i++) { Integer integer = queue.take(); // 省略保存integer操作 executorService.execute(()->{ System.out.println("保存入库=="+integer); }); } executorService.shutdown(); }
产生结果如下
同样的在生产中不建议使用,因为SDK为我们提供了工具类CompletionService,CompletionService内部就维护了一个阻塞队列,唯一与上述代码实现有所区别的是,阻塞队列入库的是Future对象,其余原理类似。
CompletionService
如何创建CompletionService
CompletionService同样是一个接口,其具体实现为ExecutorCompletionService,创建CompletionService对象有两种方式
public ExecutorCompletionService(Executor executor); public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
CompletionService对象的创建都是需要指定线程池,如果在创建时没有传入阻塞对象,那么会采用默认的LinkedBlockingQueue无界阻塞队列,如果应用到生产可能会产生OOM的情况,这是需要注意的。
CompletionService初体验
CompletionService如何做到批量执行异步任务呢,将上述场景采用CompletionService实现下
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); Future<Integer> f1 = completionService.submit(() -> { System.out.println("执行任务一"); Thread.sleep(5000); return 1; }); Future<Integer> f2 = completionService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = completionService.submit(() -> { System.out.println("执行任务三"); Thread.sleep(3000); return 3; }); for (int i = 0; i <3 ; i++) { Future take = completionService.take(); Integer integer = (Integer) take.get(); executorService.execute(()->{ System.out.println("执行入库=="+integer); }); } executorService.shutdown(); }
CompletionService接口说明
CompletionService的方法不多,使用起来比较简单,方法签名如下
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); Future<Integer> f1 = completionService.submit(() -> { System.out.println("执行任务一"); Thread.sleep(5000); return 1; }); Future<Integer> f2 = completionService.submit(() -> { System.out.println("执行任务二"); return 2; }); Future<Integer> f3 = completionService.submit(() -> { System.out.println("执行任务三"); Thread.sleep(3000); return 3; }); for (int i = 0; i <3 ; i++) { Future take = completionService.take(); Integer integer = (Integer) take.get(); executorService.execute(()->{ System.out.println("执行入库=="+integer); }); } executorService.shutdown(); }
总结
CompletionService主要是去解决无效等待的问题,如果一个耗时较长的任务在执行,那么可以采用这种方式避免无效的等待
CompletionService还能让异步任务的执行结果有序化,先执行完就先进入阻塞队列。
到此这篇关于Java中的CompletionService批量异步执行详解的文章就介绍到这了,更多相关CompletionService批量异步执行内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
最新IntelliJ IDEA 2021版配置 Tomcat 8.5 的详细步骤
idea开发工具一直是java环境最好用,很受广大开发者喜爱,今天通过本文给大家分享最新IntelliJ IDEA 2021版配置 Tomcat 8.5 的详细步骤,本文通过图文并茂的形式给大家介绍的非常详细,需要的朋友可以参考下2021-06-06SpringBoot+ThreadLocal+AbstractRoutingDataSource实现动态切换数据源
最近在做业务需求时,需要从不同的数据库中获取数据然后写入到当前数据库中,因此涉及到切换数据源问题,所以本文采用ThreadLocal+AbstractRoutingDataSource来模拟实现dynamic-datasource-spring-boot-starter中线程数据源切换,需要的朋友可以参考下2023-08-08
最新评论