Java多线程工具CompletableFuture详解

 更新时间:2024年01月22日 08:31:55   作者:SolidCocoi  
这篇文章主要介绍了Java多线程工具CompletableFuture详解,CompletableFuture 是 java 1.8 追加的新特性,通俗的话来说,是一个函数式的,用于控制多任务同步、异步组合操作的工具,需要的朋友可以参考下

简介

CompletableFuture 是 java 1.8 追加的新特性,通俗的话来说,是一个函数式的,用于控制多任务同步、异步组合操作的工具,实现诸如:

  • 控制若干个线程任务间是同步还是异步
  • 控制若干个线程间的先后执行顺序、依赖关系
  • 若干个线程任务,任意其中一个完成就执行某种逻辑
  • ……

将变得十分简单。如果你对前端有一定了解,你会发现它和 Javascript 中的 Promise 是十分类似的。

使用方法

CompletableFuture 需要依仗线程池实现自身功能,这个线程池是个非必填值,如果未特殊指明,将会使用 ForkJoinPool 的实例,构造方法为 ForkJoinPool.makeCommonPool(),该线程池大小为 Runtime.getRuntime().availableProcessors() - 1 即 当前电脑 cpu 可用核心数 -1。

常见 API

方法名称备注
complete标识自身已完成任务,并传入一个参数作为 CompletableFuture.get() 将获取的值;标识结束是 CAS 方式设置,只有 未结束→结束 的变化才能成功,complete 操作返回 true 时才真正影响 CompletableFuture.get() 将获取的值
completedFuture这是个静态方法,构造一个已完成的 CompletableFuture 对象,并以传入的参数作为 CompletableFuture.get() 将获取的值
get阻塞直至任务完成,并获取该任务的返回值
join阻塞直至任务完成,并获取该任务的返回值(几乎与 get 等同,但 join 不会抛出检查型异常,不强制要求你必须处理)
cancel标识自身已完成,无法阻断自身任务,但会构造 CancellationException传给关联在该对象后续的 CompletableFuture,后续的 CompletableFuture 会因捕获到异常而终止任务。另:该函数入参传入的 boolean 不会产生任何作用( javadoc 里这么描述也是绝了)
completeExceptionally可以理解为 cancel 的可自定义异常版本,其入参就是传递给后续 CompletableFuture 对象的异常
exceptionallyCompletableFuture 链路上发生异常时会触发该方法,给链路的最后一个 CompletableFuture 对象配置,即可对全链路进行异常捕获,其入参为异常处理时需要执行的 Function
isDoneCompletableFuture 任务是否已完成

剩下的大多 API 是 run、accept、apply、then、either、both、async …… 的组合,本质上都是语法糖,用了原生的 @FunctionalInterface,决定传入的函数有无返回值、有无入参、在当前任务结束后开始执行、是否任意一个完成就结束、是否全部完成才结束、是否另起线程执行任务 ……

// 你的下一步业务逻辑
Runnable next = new MyRunnable();
// 因为是 run 所以无返回值, 因为是 then 所以在 completableFuture1 对应的任务结束后,执行一段任务
completableFuture1.thenRun(next);
// 因为是 run 所以无返回值, 因为是 then 所以在 completableFuture1 对应的任务结束后,因为 async ,所以要另起一个线程,执行一段任务
completableFuture1.thenRunAsync(next);

在看得懂函数式编程的情况下,其他你可通过源码函数定义以此类推

如果想使用自定义的线程池执行任务,那么使用带 Executor executor 的重载函数即可,后不再重复说明,例如:

// 自定义线程池
ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20,
        2L, TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(10),
        // 你可以根据你的需要自己实现 Handler,此处为简写使用现成的 Handler,此处捕获的是流量过载异常
        new ThreadPoolExecutor.AbortPolicy());
// 通过自定义线程池使用 CompletableFuture
CompletableFuture.runAsync(() -> {
    System.out.println("我的业务代码");
}, executorService);

使用示例

为简洁代码,睡眠模拟任务运行耗时均使用下列函数,后续不再重复说明

public static void sleep(Long sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

请时刻注意要进行异常处理,意味着你的 completableFuture 链路得保证调用过下列代码,进行异常处理(链路最后一个使用,则全链路可捕获异常),后续为简洁代码已省略

completableFuture.exceptionally((Throwable ex) -> {
            ex.printStackTrace();
			// 取决于 completableFuture 有无返回值,类型是啥。此处实例是 completableFuture 为 CompletableFuture<Long>
            return -1L;
        });

将常规线程任务转化为 CompletableFuture 对象

CompletableFuture completableFuture = new CompletableFuture();
    Long startTs = System.currentTimeMillis();
    new Thread(() -> {
		sleep(200L);
		completableFuture.complete("完成");// 设置 completableFuture 结果并将状态设置为已完成
    }).start();
    while (!completableFuture.isDone()) {
        // 非阻塞式获取结果,如果当前未执行完成则返回入参字符串"我还没完成"
        // 执行到此处恰好任务完成,然后在执行 while 循环判断跳出,你在循环内最多输出一次 “完成”
        System.out.println(completableFuture.getNow("未完"));
    }
    System.out.println("最终结果:" + completableFuture.getNow("未完") + " " + (System.currentTimeMillis() - startTs) + " ms");

阻塞到任意一个任务完成

public static void 阻塞到任意一个完成() throws IOException {
        // 模拟一个耗时 20 L 的任务
        CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            sleep(20L);
            System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 20L;
        });
        // 模拟一个耗时 10 L 的任务
        CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            sleep(10L);
            System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 10L;
        });
        // applyToEitherAsync 代表另起一个线程去执行第二个入参的代码块,这里其实没啥影响,我就不加 Async 了
        CompletableFuture result = completableFuture1.applyToEither(completableFuture2, fasterOne -> {
            System.out.println(fasterOne);
            return fasterOne;
        });
        // 除了写回调函数方法外的另一种获取最快值的方法
        System.out.println("最快的为:" + result.join());
        // 调用读取行阻塞住,防止异步任务还未完成就退出了
        System.in.read();
    }

遇到特别多任务的情况下,你可以尝试数组

CompletableFuture[] array = new CompletableFuture[2];
        array[0] = completableFuture1;
        array[1] = completableFuture2;
        CompletableFuture fasterOne = CompletableFuture.anyOf(array);
        System.out.println("最快的为:" + fasterOne.join());

阻塞到全部任务完成

public static void 阻塞到全部完成() {
        Long startTs = System.currentTimeMillis();
        // 模拟一个耗时 20 L 的任务
        CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            sleep(20L);
            System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 20L;
        });
        // 模拟一个耗时 10 L 的任务
        CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            sleep(10L);
            System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 10L;
        });
        // thenAcceptBothAsync 代表另起一个线程去执行第二个入参的代码块,这里其实没啥影响,我就不加 Async 了
        CompletableFuture result = completableFuture1.thenAcceptBoth(completableFuture2, (r1, r2) -> {
            System.out.println("completableFuture1 :" + r1);
            System.out.println("completableFuture2 :" + r2);
        });
        // thenAcceptBoth 是没有返回值的,所以这里是 null ,但这句代码还是有作用的,相当于阻塞到全部任务都完成
        System.out.println("返回这个 null 之后意味着全部任务已完成:" + result.join());
    }

遇到特别多任务的情况下,你可以尝试数组

CompletableFuture[] array = new CompletableFuture[2];
    array[0] = completableFuture1;
    array[1] = completableFuture2;
    CompletableFuture<Void> all = CompletableFuture.allOf(array);
    System.out.println("返回这个 null 之后意味着全部任务已完成:" + all.join());

合并任务

合并任务会涉及到 Compose、Combine,他们区别在于合并的逻辑不同:

  • Compose: 合并的两个任务间是同步阻塞执行的,后一个任务需要阻塞等待第一个任务执行完成。你需要传入一个函数 —— 已知第一个任务的返回值,返回合并之后的 CompletableFuture 对象
  • Combine: 合并的两个任务间是异步执行的。你需要传入另一个任务、一个函数 —— 已知两个任务的返回值,合并成最终返回值
public static void 合并() {
        Long startTs = System.currentTimeMillis();
        // 模拟一个耗时 100 L 的任务
        CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            sleep(100L);
            System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 100L;
        });
        // 模拟一个耗时 100 L 的任务
        CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            sleep(100L);
            System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs));
            return 100L;
        });
        // 模拟 completableFuture1 合并一个耗时 120 L 的任务,返回值为两个任务总工时
        // thenCombineAsync 代表另起一个线程去执行第二个入参的代码块,这里其实没啥影响,我就不加 Async 了
        CompletableFuture<Long> completableFuture3 = completableFuture1.thenCombine(CompletableFuture.supplyAsync(() -> {
            sleep(120L);
            return 120L;
        }), (x, y) -> x + y);
        // 模拟 completableFuture2 合并一个耗时 50 L 的任务,返回值为两个任务总工时
        // thenComposeAsync 代表另起一个线程去执行第一个入参的代码块,这里其实没啥影响,我就不加 Async 了
        CompletableFuture<Long> completableFuture4 = completableFuture2.thenCompose(r2 -> {
            CompletableFuture<Long> temp = CompletableFuture.completedFuture(r2);
            return temp.thenApply(rTemp -> {
                sleep(50L);
                return temp.join() + 50L;
            });
        });
        boolean printFlag3 = true;
        boolean printFlag4 = true;
        String completableFuture3Info = null;
        String completableFuture4Info = null;
        while (!completableFuture3.isDone() || !completableFuture4.isDone()) {
            if (completableFuture3.isDone()) {
                if (printFlag3) {
                    printFlag3 = false;
                    completableFuture3Info = "completableFuture3 完成:" + completableFuture3.join() + " --" + (System.currentTimeMillis() - startTs);
                }
            }
            if (completableFuture4.isDone()) {
                if (printFlag4) {
                    printFlag4 = false;
                    completableFuture4Info = "completableFuture4 完成:" + completableFuture4.join() + " --" + (System.currentTimeMillis() - startTs);
                }
            }
        }
        System.out.println(completableFuture3Info != null ? completableFuture3Info : "completableFuture3 actual:" + completableFuture3.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs));
        System.out.println(completableFuture4Info != null ? completableFuture4Info : "completableFuture4 actual:" + completableFuture4.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs));
    }

你会观测到总工时更长的反而实际结束时间点更早,completableFuture3 早于 completableFuture4

到此这篇关于Java多线程工具CompletableFuture详解的文章就介绍到这了,更多相关多线程工具CompletableFuture内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解RabbitMq如何做到消息的可靠性投递

    详解RabbitMq如何做到消息的可靠性投递

    这篇文章主要为大家介绍了RabbitMq如何做到消息的可靠性投递,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-09-09
  • Java如何把int类型转换成byte

    Java如何把int类型转换成byte

    这篇文章主要介绍了Java如何把int类型转换成byte,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • Java实现AES/CBC/PKCS7Padding加解密的方法

    Java实现AES/CBC/PKCS7Padding加解密的方法

    这篇文章主要介绍了Java实现AES/CBC/PKCS7Padding加解密的方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Java文件复制多种方法实例代码

    Java文件复制多种方法实例代码

    近期用到文件复制,虽然程序很简单,因为时间久了淡忘了,所以写一篇文章记录一下,下面这篇文章主要给大家介绍了关于Java文件复制多种方法的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-05-05
  • Java反射机制,如何将一个实体类所有字段赋值为null

    Java反射机制,如何将一个实体类所有字段赋值为null

    这篇文章主要介绍了Java反射机制,如何将一个实体类所有字段赋值为null,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • Java关于BeabUtils.copyproperties的用法

    Java关于BeabUtils.copyproperties的用法

    这篇文章主要介绍了Java关于BeabUtils.copyproperties的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • Java经典面试题汇总:Java Web

    Java经典面试题汇总:Java Web

    本篇总结的是Java Web相关的面试题,后续会持续更新,希望我的分享可以帮助到正在备战面试的实习生或者已经工作的同行,如果发现错误还望大家多多包涵,不吝赐教,谢谢
    2021-07-07
  • java8 toMap问题(key重复如何解决)

    java8 toMap问题(key重复如何解决)

    这篇文章主要介绍了java8 toMap问题(key重复如何解决),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-05-05
  • Spring更简单的存储方式与获取方式详解

    Spring更简单的存储方式与获取方式详解

    Spring是一个轻量级的IoC和AOP容器框架,是为Java应用程序提供基础性服务的一套框架,目的是用于简化企业应用程序的开发,它使得开发者只需要关心业务需求,下面这篇文章主要给大家介绍了关于Spring更简单的存储方式与获取方式的相关资料,需要的朋友可以参考下
    2022-06-06
  • Mybatis中关于自定义mapper.xml时,参数传递的方式及写法

    Mybatis中关于自定义mapper.xml时,参数传递的方式及写法

    这篇文章主要介绍了Mybatis中关于自定义mapper.xml时,参数传递的方式及写法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12

最新评论