Java中的CompletionService批量异步执行详解

 更新时间:2023年12月22日 09:05:54   作者:Java面试365  
这篇文章主要介绍了Java中的CompletionService批量异步执行详解,我们知道线程池可以执行异步任务,同时可以通过返回值Future获取返回值,所以异步任务大多数采用ThreadPoolExecutor+Future,需要的朋友可以参考下

前景引入

我们知道线程池可以执行异步任务,同时可以通过返回值Future获取返回值,所以异步任务大多数采用ThreadPoolExecutor+Future,如果存在如下情况,需要从任务一二三中获取返回值后,保存到数据库中,用异步逻辑实现代码应该如下所示。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    Future<Integer> f1 = executorService.submit(() -> {
        System.out.println("执行任务一");
        return 1;
    });
    Future<Integer> f2 = executorService.submit(() -> {
        System.out.println("执行任务二");
        return 2;
    });
    Future<Integer> f3 = executorService.submit(() -> {
        System.out.println("执行任务三");
        return 3;
    });
    Integer r1 = f1.get();
    executorService.execute(()->{
        // 省略保存r1操作
        System.out.println(r1);
    });
    Integer r2 = f2.get();
    executorService.execute(()->{
        // 省略保存r2操作
        System.out.println(r2);
    });
    Integer r3 = f3.get();
    executorService.execute(()->{
        // 省略保存r3操作
        System.out.println(r3);
    });
    executorService.shutdown();
}

这样写的代码一点毛病没有,逻辑都是正常的,但如果存在任务一查询了比较耗时的操作,由于f1.get是阻塞执行,那么就算任务二和任务三已经返回结果,任务二的返回值和任务三的返回值都是不能保存到数据库的,因为f1.get将主线程阻塞了。

批量异步实现

那可以如何处理呢?可以采用万能的阻塞队列,任务先执行完毕的先入队,这样可以保证其它线程入库的速度不受影响,提高效率。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(3);
    Future<Integer> f1 = executorService.submit(() -> {
        System.out.println("执行任务一");
        Thread.sleep(5000);
        return 1;
    });
    Future<Integer> f2 = executorService.submit(() -> {
        System.out.println("执行任务二");
        return 2;
    });
    Future<Integer> f3 = executorService.submit(() -> {
        System.out.println("执行任务三");
        Thread.sleep(3000);
        return 3;
    });
    executorService.execute(()->{
        try {
            Integer r1 = f1.get();
            // 阻塞队列入队操作
            queue.put(r1);
            System.out.println(r1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    executorService.execute(()->{
        try {
            Integer r2 = f2.get();
            queue.put(r2);
            System.out.println(r2);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    executorService.execute(()->{
        try {
            Integer r3 = f3.get();
            queue.put(r3);
            System.out.println(r3);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    // 循环次数不要使用queue.size限制,因为不同时刻queue.size值是有可能不同的
    for (int i = 0; i <3; i++) {
        Integer integer = queue.take();
        // 省略保存integer操作
        executorService.execute(()->{
            System.out.println("保存入库=="+integer);
        });
    }
    executorService.shutdown();
}

产生结果如下

image-20220307193401671

同样的在生产中不建议使用,因为SDK为我们提供了工具类CompletionService,CompletionService内部就维护了一个阻塞队列,唯一与上述代码实现有所区别的是,阻塞队列入库的是Future对象,其余原理类似。

CompletionService

如何创建CompletionService

CompletionService同样是一个接口,其具体实现为ExecutorCompletionService,创建CompletionService对象有两种方式

public ExecutorCompletionService(Executor executor);
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)

CompletionService对象的创建都是需要指定线程池,如果在创建时没有传入阻塞对象,那么会采用默认的LinkedBlockingQueue无界阻塞队列,如果应用到生产可能会产生OOM的情况,这是需要注意的。

CompletionService初体验

CompletionService如何做到批量执行异步任务呢,将上述场景采用CompletionService实现下

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CompletionService completionService = new ExecutorCompletionService(executorService);
    Future<Integer> f1 = completionService.submit(() -> {
        System.out.println("执行任务一");
        Thread.sleep(5000);
        return 1;
    });
    Future<Integer> f2 = completionService.submit(() -> {
        System.out.println("执行任务二");
        return 2;
    });
    Future<Integer> f3 = completionService.submit(() -> {
        System.out.println("执行任务三");
        Thread.sleep(3000);
        return 3;
    });
    for (int i = 0; i <3 ; i++) {
        Future take = completionService.take();
        Integer integer = (Integer) take.get();
        executorService.execute(()->{
            System.out.println("执行入库=="+integer);
        });
    }
    executorService.shutdown();
}

CompletionService接口说明

CompletionService的方法不多,使用起来比较简单,方法签名如下

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CompletionService completionService = new ExecutorCompletionService(executorService);
    Future<Integer> f1 = completionService.submit(() -> {
        System.out.println("执行任务一");
        Thread.sleep(5000);
        return 1;
    });
    Future<Integer> f2 = completionService.submit(() -> {
        System.out.println("执行任务二");
        return 2;
    });
    Future<Integer> f3 = completionService.submit(() -> {
        System.out.println("执行任务三");
        Thread.sleep(3000);
        return 3;
    });
    for (int i = 0; i <3 ; i++) {
        Future take = completionService.take();
        Integer integer = (Integer) take.get();
        executorService.execute(()->{
            System.out.println("执行入库=="+integer);
        });
    }
    executorService.shutdown();
}

总结

CompletionService主要是去解决无效等待的问题,如果一个耗时较长的任务在执行,那么可以采用这种方式避免无效的等待

CompletionService还能让异步任务的执行结果有序化,先执行完就先进入阻塞队列。

到此这篇关于Java中的CompletionService批量异步执行详解的文章就介绍到这了,更多相关CompletionService批量异步执行内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java题解LeetCode20.有效的括号

    java题解LeetCode20.有效的括号

    这篇文章主要为大家介绍了java题解LeetCode20.有效的括号示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • 简单了解JAVA内存泄漏和溢出区别及联系

    简单了解JAVA内存泄漏和溢出区别及联系

    这篇文章主要介绍了简单了解JAVA内存泄漏和溢出区别及联系,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • Spring内置定时任务调度@Scheduled使用详解

    Spring内置定时任务调度@Scheduled使用详解

    这篇文章主要介绍了Spring内置定时任务调度@Scheduled使用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-12-12
  • 多线程计数,怎么保持计数准确的方法

    多线程计数,怎么保持计数准确的方法

    这篇文章主要介绍了多线程计数的方法,有需要的朋友可以参考一下
    2014-01-01
  • 最新IntelliJ IDEA 2021版配置 Tomcat 8.5 的详细步骤

    最新IntelliJ IDEA 2021版配置 Tomcat 8.5 的详细步骤

    idea开发工具一直是java环境最好用,很受广大开发者喜爱,今天通过本文给大家分享最新IntelliJ IDEA 2021版配置 Tomcat 8.5 的详细步骤,本文通过图文并茂的形式给大家介绍的非常详细,需要的朋友可以参考下
    2021-06-06
  • Spring循环引用失败问题源码解析

    Spring循环引用失败问题源码解析

    这篇文章主要为大家介绍了Spring循环引用失败问题源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-09-09
  • SpringBoot+ThreadLocal+AbstractRoutingDataSource实现动态切换数据源

    SpringBoot+ThreadLocal+AbstractRoutingDataSource实现动态切换数据源

    最近在做业务需求时,需要从不同的数据库中获取数据然后写入到当前数据库中,因此涉及到切换数据源问题,所以本文采用ThreadLocal+AbstractRoutingDataSource来模拟实现dynamic-datasource-spring-boot-starter中线程数据源切换,需要的朋友可以参考下
    2023-08-08
  • Java堆内存又溢出了!教你一招必杀技(推荐)

    Java堆内存又溢出了!教你一招必杀技(推荐)

    这篇文章主要介绍了Java内存溢出问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • 基于Mock测试Spring MVC接口过程解析

    基于Mock测试Spring MVC接口过程解析

    这篇文章主要介绍了基于Mock测试Spring MVC接口过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • SpringBoot中实现登录拦截器的代码实例

    SpringBoot中实现登录拦截器的代码实例

    这篇文章主要介绍了SpringBoot中实现登录拦截器的代码实例,对于管理系统或其他需要用户登录的系统,登录验证都是必不可少的环节,在SpringBoot开发的项目中,通过实现拦截器来实现用户登录拦截并验证,需要的朋友可以参考下
    2023-10-10

最新评论