java CompletableFuture实现异步编排详解
前言
为什么需要异步执行?
场景:电商系统中获取一个完整的商品信息可能分为以下几步:
①获取商品基本信息
②获取商品图片信息
③获取商品促销活动信息
④获取商品各种类的基本信息 等操作,如果使用串行方式去执行这些操作,假设每个操作执行1s,那么用户看到完整的商品详情就需要4s的时间,如果使用并行方式执行这些操作,可能只需要1s就可以完成。所以这就是异步执行的好处。
JDK5的Future
接口
Future
接口用于代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。
列举Future
接口的方法:
get()
:获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出CancellationException
异常,如果任务执行过程发生异常则会抛出ExecutionException
异常,如果阻塞等待过程中被中断则会抛出InterruptedException
异常。get(long timeout,Timeunit unit)
:带超时时间的get()
方法,如果阻塞等待过程中超时则会抛出TimeoutException
异常。cancel()
:用于取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。如果任务还没有被执行,则会返回true并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning
为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning
为false,则会返回true且不会中断任务执行线程。isCanceled()
:判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。isDone()
:判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
使用Future
接口和Callable
接口实现异步执行:
public static void main(String[] args) { // 快速创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(4); // 获取商品基本信息(可以使用Lambda表达式简化Callable接口,这里为了便于观察不使用) Future<String> future1 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "获取到商品基本信息"; } }); // 获取商品图片信息 Future<String> future2 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "获取商品图片信息"; } }); // 获取商品促销信息 Future<String> future3 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "获取商品促销信息"; } }); // 获取商品各种类基本信息 Future<String> future4 = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "获取商品各种类基本信息"; } }); // 获取结果 try { System.out.println(future1.get()); System.out.println(future2.get()); System.out.println(future3.get()); System.out.println(future4.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executorService.shutdown(); } }
既然Future可以实现异步执行并获取结果,为什么还会需要CompletableFuture?
简述一下Future接口的弊端:
- 不支持手动完成
- 当提交了一个任务,但是执行太慢了,通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。
- 不支持进一步的非阻塞调用
- 通过Future的
get()
方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能。
- 通过Future的
- 不支持链式调用
- 对于Future的执行结果,想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在 Future中无法实现。
- 不支持多个 Future 合并
- 比如有10个Future并行执行,想在所有的Future运行完毕之后,执行某些函数,是无法通过Future实现的。
- 不支持异常处理
- Future的API没有任何的异常处理的api,所以在异步运行时,如果出了异常问题不好定位。
使用Future接口可以通过get()
阻塞式获取结果或者通过轮询+isDone()
非阻塞式获取结果,但是前一种方法会阻塞,后一种会耗费CPU资源,所以JDK的Future接口实现异步执行对获取结果不太友好,所以在JDK8时推出了CompletableFuture实现异步编排。
CompletableFuture的使用
CompletableFuture概述
JDK8中新增加了一个包含50个方法左右的类CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture
类实现了Future
接口和CompletionStage
接口,即除了可以使用Future
接口的所有方法之外,CompletionStage<T>
接口提供了更多方法来更好的实现异步编排,并且大量的使用了JDK8引入的函数式编程概念。后面会细致的介绍常用的API。
① 创建CompletableFuture的方式
使用new
关键字创建
// 无返回结果 CompletableFuture<String> completableFuture = new CompletableFuture<>(); // 已知返回结果 CompletableFuture<String> completableFuture = new CompletableFuture<>("result"); // 已知返回结果(底层其实也是带参数的构造器赋值) CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("result");
创建一个返回结果类型为String的CompletableFuture,可以使用Future
接口的get()
方法获取该值(同样也会阻塞)。
可以使用无参构造器返回一个没有结果的CompletableFuture,也可以通过构造器的传参CompletableFuture设置好返回结果,或者使用CompletableFuture.completedFuture(U value)
构造一个已知结果的CompletableFuture。
使用CompletableFuture类的静态工厂方法(常用)
runAsync()
无返回值
// 使用默认线程池 public static CompletableFuture<Void> runAsync(Runnable runnable) // 使用自定义线程池(推荐) public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
runAsync()
方法的参数是Runnable接口,这是一个函数式接口,不允许返回值。当需要异步操作且不关心返回结果的时候可以使用runAsync()
方法。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { // 通过Lambda表达式实现Runnable接口 CompletableFuture.runAsync(()-> System.out.println("获取商品基本信息成功"), executor).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executor.shutdown(); } }
supplyAsync()
有返回值
// 使用默认线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) // 使用自定义线程池(推荐) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
supplyAsync()
方法的参数是Supplier<U>
供给型接口(无参有返回值),这也是一个函数式接口,U
是返回结果值的类型。当需要异步操作且关心返回结果的时候,可以使用supplyAsync()
方法。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { // 通过Lambda表达式实现执行内容,并返回结果通过CompletableFuture接收 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("获取商品信息成功"); return "信息"; }, executor); // 输出结果 System.out.println(completableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executor.shutdown(); } }
关于第二个参数Executor executor
说明
在没有指定第二个参数(即没有指定线程池)时,CompletableFuture直接使用默认的ForkJoinPool.commonPool()
作为它的线程池执行异步代码。
在实际生产中会使用自定义的线程池来执行异步代码,具体可以参考另一篇文章深入理解线程池ThreadPoolExecutor ,里面的第二节有生产中怎么创建自定义线程的例子,可以参考一下。
② 获得异步执行结果
get()
阻塞式获取执行结果
该方法调用后如果任务还没完成则会阻塞等待直到任务执行完成。如果任务执行过程发生异常则会抛出ExecutionException
异常,如果阻塞等待过程中被中断则会抛出InterruptedException
异常。
get(long timeout, TimeUnit unit)
带超时的阻塞式获取执行结果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
该方法调用后如果如果任务还没完成则会阻塞等待直到任务执行完成或者超出timeout时间,如果阻塞等待过程中超时则会抛出TimeoutException
异常。
getNow(T valueIfAbsent)
立刻获取执行结果
public T getNow(T valueIfAbsent)
该方法调用后,会立刻获取结果不会阻塞等待。如果任务完成则直接返回执行完成后的结果,如果任务没有完成,则返回调用方法时传入的参数valueIfAbsent
值。
join()
不抛异常的阻塞时获取执行结果
public T join()
该方法和get()
方法作用一样,只是不会抛出异常。
complete(T value)
主动触发计算,返回异步是否执行完毕
public boolean complete(T value)
该方法调用后,会主动触发计算结果,如果此时异步执行并没有完成(此时boolean值返回true),则通过get()
拿到的数据会是complete()
设置的参数value
值,如果此时异步执行已经完成(此时boolean值返回false),则通过get()
拿到的就是执行完成的结果。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { // 通过Lambda表达式实现执行内容,并返回结果通过CompletableFuture接收 CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { // 休眠2秒,使得异步执行变慢,会导致主动触发计算先执行,此时返回的get就是555 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 666; }, executor); // 主动触发计算,判断异步执行是否完成 System.out.println(completableFuture.complete(555)); // 输出结果 System.out.println(completableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: true 555 **/
③ 对执行结果进行处理
whenComplete
等待前面任务执行完再执行当前处理
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action)
在创建好的初始任务或者是上一个任务后通过链式调用该方法,会在之前任务执行完成后继续执行whenComplete
里的内容(whenComplete
传入的action只是对之前任务的结果进行处理),即使用该方法可以避免前面说到的Future
接口的问题,不再需要通过阻塞或者轮询的方式去获取结果,而是通过调用该方法等任务执行完毕自动调用。
该方法的参数为BiConsumer<? super T, ? super Throwable> action
消费者接口,可以接收两个参数,一个是任务执行完的结果,一个是执行任务时的异常。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .whenComplete((res, ex) -> System.out.println("任务执行完毕,结果为" + res + " 异常为" + ex) ); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 任务执行完毕,结果为666 异常为null **/
除了上述的方法外,还有一些类似的方法如XXXAsync()
或者是XXXAsync(XX,Executor executor)
,对于这些方法,这里统一说明,后续文章中将不会再列举
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)
XXXAsync()
:表示上一个任务执行完成后,不会再使用之前任务中的线程,而是重新使用从默认线程(ForkJoinPool 线程池)中重新获取新的线程执行当前任务。
XXXAsync(XX,Executor executor)
:表示不会沿用之前任务的线程,而是使用自己第二个参数指定的线程池重新获取线程执行当前任务。
④ 对执行结果进行消费
thenRun
前面任务执行完后执行当前任务,不关心前面任务的结果,也没返回值
public CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该方法表示:执行任务A完成后接着执行任务B,但是任务B不需要A的结果,并且执行完任务B也不会返回结果。
thenRun(Runnable action)
的参数为Runnable接口即没有传入参数。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenRun(() -> System.out.println("我都没有参数怎么拿到之前的结果,我也没有返回值。") ); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 我都没有参数怎么拿到之前的结果,我也没有返回值。 **/
thenAccept
前面任务执行完后执行当前任务,消费前面的结果,没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该方法表示:执行任务A完成后接着执行任务B,而且任务B需要A的结果,但是执行完任务B不会返回结果。
thenAccept(Consumer<? super T> action)
的参数为消费者接口,即可以传入一个参数,该参数为上一个任务的执行结果。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenAccept((res) -> System.out.println("我能拿到上一个的结果" + res + ",但是我没法传出去。") ); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 我能拿到上一个的结果666,但是我没法传出去。 **/
thenApply
前面任务执行完后执行当前任务,消费前面的结果,具有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture.supplyAsync(actionA).thenRun(actionB)
像这样链式调用该方法表示:执行任务A完成后接着执行任务B,而且任务B需要A的结果,并且执行完任务B需要有返回结果。
thenApply(Function<? super T,? extends U> fn)
的参数为函数式接口,即可以传入一个参数类型为T,该参数是上一个任务的执行结果,并且函数式接口需要有返回值,类型为U。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenApply((res) -> { System.out.println("我能拿到上一个的结果" + res + "并且我要将结果传出去"); return res; } ).whenComplete((res, ex) -> System.out.println("结果" + res)); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 我能拿到上一个的结果666并且我要将结果传出去 结果666 **/
⑤ 异常处理
exceptionally
异常捕获,只消费前面任务中出现的异常信息,具有返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
可以通过链式调用该方法来获取异常信息,并且具有返回值。如果某一个任务出现异常被exceptionally
捕获到则剩余的任务将不会再执行。类似于Java异常处理的catch。
exceptionally(Function<Throwable, ? extends T> fn)
的参数是函数式接口,具有一个参数以及返回值,该参数为前面任务的异常信息。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) throw new RuntimeException("error"); return 666; }, executor) .thenApply((res) -> { System.out.println("不出现异常,结果为" + res); return res; }).exceptionally((ex) -> { ex.printStackTrace(); return -1; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: // 这是不出现异常的情况 不出现异常,结果为666 // 这是出现异常的情况 java.util.concurrent.CompletionException: java.lang.RuntimeException: error at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.RuntimeException: error at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more **/
handle
异常处理,消费前面的结果及异常信息,具有返回值,不会中断后续任务
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
可以通过链式调用该方法可以跟thenApply()
一样可以消费前面任务的结果并完成自己任务内容,并且具有返回值。不同之处在于出现异常也可以接着往下执行,根据异常参数做进一步处理。
handle(BiFunction<? super T, Throwable, ? extends U> fn)
的参数是消费者接口,一个参数是任务执行结果,一个是异常信息,并且具有返回值。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture.supplyAsync(() -> 666, executor) .thenApply((res) -> { if (Math.random() < 0.5) throw new RuntimeException("error"); return res; }).handle((res, ex) -> { System.out.println("结果" + res + "(null表示之前出现异常导致结果无法传过来)"); return res == null ? -1 : res; }).thenApply((res) -> { System.out.println("结果为" + res + "(-1表示之前出现异常,经过handler使得结果处理成-1)"); return res; }).exceptionally((ex) -> { ex.printStackTrace(); return -1; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: // 这是不出现异常的情况 结果666(null表示之前出现异常导致结果无法传过来) 结果为666(-1表示之前出现异常,经过handler使得结果处理成-1) // 这是出现异常的情况 结果null(null表示之前出现异常导致结果无法传过来) 结果为-1(-1表示之前出现异常,经过handler使得结果处理成-1) **/
可以看到通过handle
类似于Java异常处理的finally,出现异常并不会像使用exceptionally
那样中断后续的任务,而是继续执行,可以通过handle为之前出现异常无法获得的结果重新赋值(根据业务需求设置安全值之类的)。
⑥ 两组任务按顺序执行
thenCompose
实现两组任务按前后顺序执行
public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)
A.thenCompose(B)
相当于任务A要排在任务B前面,即顺序的执行任务A、任务B。该方法的参数是函数式接口,函数式接口的参数是调用者的执行结果,返回值是另一个任务B。
public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { System.out.println("任务A先执行结果为666"); return 666; }, executor); actionA.thenCompose((res) -> CompletableFuture.supplyAsync(() -> { System.out.println("任务B后执行结果加上333"); return 333 + res; })).whenComplete((res, ex) -> System.out.println(res)); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 任务A先执行结果为666 任务B后执行结果加上333 999 **/
⑦ 两组任务谁快用谁
applyToEither
比较两组任务执行速度,谁快消费谁的执行结果
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)
该方法用于比较两组任务的执行速度,谁先执行完就用谁的执行结果。
传入参数说明:第一个参数传入的是另一个任务的执行内容,第二个参数传入的是最终这两个任务谁快返回谁的结果,并通过当前函数式接口进行接收和处理(使用函数式接口,有参且有返回值)。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务A等待久一点,执行结果为555"); return 555; }, executor); actionA.applyToEither(CompletableFuture.supplyAsync(() -> { System.out.println("任务B很快,执行结果为666"); return 666; }), (res) -> { System.out.println("最终使用的执行结果为" + res); return res; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 任务B很快,执行结果为666 最终使用的执行结果为666 任务A等待久一点,执行结果为555 **/
除了applyToEither
对任务最终结果进行获取并消费,并且具有返回值的方法外,还有两个类似的方法。
// 这个方法效果和上面的一样,比谁快拿谁的结果,不同的是这个方法只消费不具有返回值 public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)
// 这个方法效果和上面的一样,比谁快拿谁的结果,不同的是这个方法不消费结果也不具有返回值 public CompletableFuture<Void> runAfterEither( CompletionStage<?> other, Runnable action)
⑧ 两组任务完成后合并
thenCombine
等待两组任务执行完毕后,合并两组任务的执行结果
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
该方法用于两组任务都完成后,将两组任务的执行结果一起交给当前方法的BiFunction处理。先完成的任务会等待后者任务完成。
传入参数说明:第一个参数传入的是另一个任务的执行内容,第二个参数传入的是带两个参数的函数式接口(第一个参数是任务1的执行结果,第二个参数是任务2的执行结果,具有返回值)。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务A等待久一点,执行结果为333"); return 333; }, executor); CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> { System.out.println("任务B很快,执行结果为666"); return 666; }, executor); actionA.thenCombine(actionB, (res1, res2) -> { System.out.println("最终使用的执行结果为" + (res1 + res2)); return res1 + res2; }); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 任务B很快,执行结果为666 任务A等待久一点,执行结果为333 最终使用的执行结果为999 **/
除了thenCombine
对任务最终结果进行获取并消费,并且具有返回值的方法外,还有两个类似的方法。
// 这个方法效果和上面的一样,获取合并结果,不同的是这个方法只消费不具有返回值 public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
// 这个方法效果和上面的一样,获取合并结果,不同的是这个方法不消费结果也不具有返回值 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
⑨ 多任务组合
allOf
实现并行地执行多个任务,等待所有任务执行完成(无需考虑执行顺序)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
该方法可以实现并行地执行多个任务,适用于多个任务没有依赖关系,可以互相独立执行的,传入参数为多个任务,没有返回值。
allOf()
方法会等待所有的任务执行完毕再返回,可以通过get()
阻塞确保所有任务执行完毕
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务A等待2秒后执行完毕"); }, executor); CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> { System.out.println("任务B很快执行完毕"); }, executor); CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务C等待1秒后执行完毕"); }, executor); CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务D等待5秒后执行完毕"); }, executor); CompletableFuture.allOf(actionA, actionB, actionC, actionD).get(); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 任务B很快执行完毕 任务C等待1秒后执行完毕 任务A等待2秒后执行完毕 任务D等待5秒后执行完毕 **/
anyOf
实现并行地执行多个任务,只要有个一个完成的便会返回执行结果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
该方法可以实现并行地执行多个任务,传入参数为多个任务,具有返回值。该方法不会等待所有任务执行完成后再返回结果,而是当有一个任务完成时,便会返回那个任务的执行结果。
// 例子 public static void main(String[] args) { // 快速创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); try { CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务A等待2秒后执行完毕"); return 555; }, executor); CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> { System.out.println("任务B很快执行完毕"); return 666; }, executor); CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务C等待1秒后执行完毕"); return 999; }, executor); CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务D等待5秒后执行完毕"); return 888; }, executor); System.out.println("最先执行完的返回结果为" + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get()); } catch (Exception e) { e.printStackTrace(); }finally { executor.shutdown(); } } /** 输出结果: 任务B很快执行完毕 最先执行完的返回结果为666 任务C等待1秒后执行完毕 任务A等待2秒后执行完毕 任务D等待5秒后执行完毕 **/
一个使用CompletableFuture异步编排的例子
不需要关心例子中的业务内容,使用时按照自己业务的需求,对不同的需求调用不同API即可。
编写任务时主要关心以下几点:
① 是否需要消费之前任务的结果
② 是否需要返回结果给其他任务消费
③ 是否要求顺序执行(是否允许并行,有没有前置要求)
/** * 该方法用于获取单个商品的所有信息 * 1. 商品的基本信息 * 2. 商品的图片信息 * 3. 商品的销售属性组合 * 4. 商品的各种分类基本信息 * 5. 商品的促销信息 */ @Override public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException { // 创建商品Vo通过各个任务去完善Vo的信息 SkuItemVo skuItemVo = new SkuItemVo(); // 获取商品基本信息 查询到后设置进Vo中,返回基本信息给后续任务消费 (使用自定义的线程池进行异步) CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> { SkuInfoEntity info = this.getById(skuId); skuItemVo.setInfo(info); return info; }, executor); // 获取商品的图片信息 获取后设置进Vo中,此处不需要消费图片信息,也不需要返回结果。所以使用runAsync即可 CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> { List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(imagesEntities); }, executor); // 获取商品销售属性 因为要利用之前查询到的基本信息,但后续任务不需要消费销售属性(不需要返回结果),所以使用thenAcceptAsync消费之前的基本信息,不返回销售信息。 CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> { List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId()); skuItemVo.setSaleAttr(saleAttrVos); }, executor); // 获取商品各分类基本信息,同样要消费之前的基本信息,但无需返回,所以使用thenAcceptAsync即可 CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> { SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId()); skuItemVo.setDesc(spuInfoDescEntity); }, executor); // 获取商品的促销信息 这个也不需要消费之前任务的结果,也不需要返回结果。所以直接使用runAsync即可 CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> { R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId); if (skuSeckilInfo.getCode() == 0) { SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() { }); skuItemVo.setSeckillSkuVo(seckilInfoData); if (seckilInfoData != null) { long currentTime = System.currentTimeMillis(); if (currentTime > seckilInfoData.getEndTime()) { skuItemVo.setSeckillSkuVo(null); } } } }, executor); // 使用allOf()组合所有任务,并且使用get()阻塞,等待所有任务完成。 // 补充:infoFuture不能放入allOf中,因为allOf是并行无序执行(需要多个条件是无依赖性的)的,当上面任务中有需要消费infoFuture的结果,所以需要先执行infoFuture。 CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,seckillFuture).get(); // 最后返回商品Vo return skuItemVo; }
以上就是java CompletableFuture实现异步编排详解的详细内容,更多关于java CompletableFuture异步编排的资料请关注脚本之家其它相关文章!
相关文章
elasticsearch通过guice注入Node组装启动过程
这篇文章主要为大家介绍了 elasticsearch通过guice注入Node组装启动过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2022-04-04Java并发编程之ReadWriteLock读写锁的操作方法
这篇文章主要介绍了Java并发编程之ReadWriteLock读写锁的操作方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2021-02-02spring boot项目application.properties文件存放及使用介绍
这篇文章主要介绍了spring boot项目application.properties文件存放及使用介绍,我们的application.properties文件中会有很多敏感信息,大家在使用过程中要多加小心2021-06-06
最新评论