Java CompletableFuture实现原理分析详解

 更新时间:2022年09月30日 14:34:24   作者:生命猿于运动  
CompletableFuture是Java8并发新特性,本文我们主要来聊一聊CompletableFuture的回调功能以及异步工作原理是如何实现的,需要的可以了解一下

简介

前面的一篇文章你知道Java8并发新特性CompletableFuture吗?介绍了CompletableFuture的特性以及一些使用方法,今天我们主要来聊一聊CompletableFuture的回调功能以及异步工作原理是如何实现的。

CompletableFuture类结构

1.CompletableFuture类结构主要有两个属性

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions
    ...
}
  • result:存储CompletableFuture的返回,或者存储异常对象AltResult。
  • stack:是CompletableFuture.Completion对象,表示操作数栈栈顶,在进行CompletableFuture链式调用的过程中,所有链式调用的CompletableFuture任务都会被压入该stack中,在任务调用的过程按后进先出的顺序出栈执行完所有任务。

2.stack属性栈结构

abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // Treiber stack link
    ...
}

next:存储下一个任务链式调用栈。

3.UniCompletion的内部结构

abstract static class UniCompletion<T,V> extends Completion {
    Executor executor;                 // executor to use (null if none)
    CompletableFuture<V> dep;          // the dependent to complete
    CompletableFuture<T> src;          // source for action
    ...
}

UniCompletion继承Completion类,包含以下几个参数:

  • executor:异步任务执行器,如果为空则有主线程执行任务不进行异步执行。
  • dep:指向当前任务构建的CompletabueFuture
  • src:指向源CompletableFuture任务

CompletableFuture回调原理

这里为了方便讲解,我们用以下简短的代码来进行分析:

public static void main(String[] args) {
    CompletableFuture<String> baseFuture = CompletableFuture.completedFuture("Base Future");
    log.info(baseFuture.thenApply((r) -> r + " Then Apply").join());
    baseFuture.thenAccept((r) -> log.info(r)).thenAccept((Void) -> log.info("Void"));
}

上面的代码我们通过创建一个简单的CompletableFuture对象,再执行baseFuture.thenApply()调用后会进行一个入栈操作,如下图baseFuture引用的CompletableFuturestack属性将会指向baseFuture.thenApply()结果返回的新CompletableFuture对象,而新CompletableFuture对象的src属性将指向来源CompletableFuturebaseFuture所引用的对象。

2.在上一步的基础上再执行下一行代码,结果的引用关系图如下图:

在执行完baseFuture.thenAccept()的时候,thenAccept返回的任务将被压入栈顶,next指向上一个代码段的返回对象,在thenAccept返回的新CompletableFuture对象中在进行一次thenAccept的调用,就再产生一个新的CompletableFuture对象,dept属性就指向最新的CompletableFuture对象。

thenApply实现源码分析

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    // 创建一个新的CompletableFuture对象
    CompletableFuture<V> d =  new CompletableFuture<V>();
    // e:如果是异步调用直接执行代码块
    // !d.uniApply:执行任务,如果返回false则任务未执行需入栈
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        // 创建出新的UniApply对象进行入栈
        push(c);
        // 尝试执行任务
        c.tryFire(SYNC);
    }
    return d;
}

final <S> boolean uniApply(CompletableFuture<S> a,
                           Function<? super S,? extends T> f,
                           UniApply<S,T> c) {
    Object r; Throwable x;
    // 任务未完成结果为null直接返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    // 验证是否出现异常结果,如有则任务执行结束
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            // 异步执行任务
            if (c != null && !c.claim())
                // 任务未执行返回false
                return false;
            @SuppressWarnings("unchecked") S s = (S) r;
            // 任务执行完成将结果写入result
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

以上代码片段主要描述了CompletableFuture在执行任务时会创建出新的CompletableFuture对象,使用新对象执行任务并获取结果使用CAS写入到result属性,如果任务未执行将压入栈顶,再重新尝试任务执行,在CompletableFuture其他方法的调用也都大同小异,这里不在逐一分析,可自行打开源码阅读便于理解。

CompletableFuture异步原理

需要进行CompletableFuture异步调用则要使用Async结尾的方法执行任务,这里我们就拿thenAcceptAsync()的源码进行分析。

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    // 如果是异步任务,这里的参数e不会为空,也就是会将任务执行压入栈顶
    if (e != null || !d.uniAccept(this, f, null)) {
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        push(c);
        // 重点还是这个入口
        c.tryFire(SYNC);
    }
    return d;
}

static final class UniAccept<T> extends UniCompletion<T,Void> {
    Consumer<? super T> fn;
    UniAccept(Executor executor, CompletableFuture<Void> dep,
              CompletableFuture<T> src, Consumer<? super T> fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d; CompletableFuture<T> a;
        // dep为空即任务已被执行过,直接返回null
        // uniAccept()结果为false,可能是任务执行中未完成,也可能是由线程池中的其他任务执行完成
        if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; fn = null;
        // 说明当前线程执行了该任务,返回结果继续执行前一个任务
        return d.postFire(a, mode);
    }
}

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    if (a != null && a.stack != null) {
        // postComplete调用过来的,或者上一个任务执行完成,清空栈数据,否则调用postComplete完成任务
        if (mode < 0 || a.result == null)
            a.cleanStack();
        else
            // 完成任务执行并进行出栈
            a.postComplete();
    }
    if (result != null && stack != null) {
        if (mode < 0)
            // postComplete调用过来的任务已完成
            return this;
        else
            // 完成任务执行并进行出栈
            postComplete();
    }
    return null;
}

CompletableFuture进行异步主要是通过将任务压入栈顶后tryFire方法进行异步处理,如果任务未被执行则会通过postFire方法有线程池中的线程进行任务执行,任务执行结果再使用CAS将结果返回到result,其他线程即可得知任务是否被执行过,如果当前现场判断当前任务为被执行,则调用postComplete()执行完成任务。

总结

CompletableFuture通过异步回调的方式,解决了开发过程中异步调用获取结果的难点。开发人员只需接触到CompletableFuture对象,以及CompletableFuture任务的执行结果,无需设计具体异步回调的实现,并可通过自定义线程池进一步优化任务的异步调用。

到此这篇关于Java CompletableFuture实现原理分析详解的文章就介绍到这了,更多相关Java CompletableFuture内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • MyBatisPlus 一对多、多对一、多对多的完美解决方案

    MyBatisPlus 一对多、多对一、多对多的完美解决方案

    这篇文章主要介绍了MyBatisPlus 一对多、多对一、多对多的完美解决方案,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • Java常用命令汇总

    Java常用命令汇总

    这篇文章主要介绍了Java常用命令汇总,小编觉得挺不错的,这里给大家分享下,供需要的朋友参考。
    2017-10-10
  • Spring Boot 如何通过ServletRequestHandledEvent事件实现接口请求的性能监控

    Spring Boot 如何通过ServletRequestHandledEvent事件实现接口请求的性能监控

    在Spring框架中,监控接口请求的性能可以通过ServletRequestHandledEvent事件实现,这篇文章给大家介绍Spring Boot 如何通过ServletRequestHandledEvent事件实现接口请求的性能监控,感兴趣的朋友跟随小编一起看看吧
    2024-08-08
  • SpringBoot生成jar/war包的布局应用

    SpringBoot生成jar/war包的布局应用

    在 Spring Boot 中,"布局应用"(Application Layout)指的是打包生成的可执行 jar 或 war 文件中的内容组织结构,本文给大家介绍了SpringBoot生成jar/war包的布局应用,需要的朋友可以参考下
    2024-02-02
  • Java并发编程之代码实现两玩家交换装备

    Java并发编程之代码实现两玩家交换装备

    这篇文章主要介绍了Java并发编程之代码实现两玩家交换装备,文中有非常详细的代码示例,对正在学习java的小伙伴们有一定的帮助,需要的朋友可以参考下
    2021-09-09
  • Java实现微信公众号自定义菜单的创建方法示例

    Java实现微信公众号自定义菜单的创建方法示例

    这篇文章主要介绍了Java实现微信公众号自定义菜单的创建方法,结合实例形式分析了java创建微信公众号自定义菜单的具体步骤、实现方法及相关操作注意事项,需要的朋友可以参考下
    2019-10-10
  • idea运行程序报错java程序包org.junit不存在解决办法

    idea运行程序报错java程序包org.junit不存在解决办法

    这篇文章主要给大家介绍了关于idea运行程序报错java程序包org.junit不存在的解决办法, 当出现程序包org.junit不存在的问题时,可以通过使用适当的JUnit版本、添加依赖或重新下载程序包等方式进行解决,需要的朋友可以参考下
    2024-02-02
  • Java正则环视和反向引用功能与用法详解

    Java正则环视和反向引用功能与用法详解

    这篇文章主要介绍了Java正则环视和反向引用功能与用法,结合实例形式较为详细的分析了java正则环视与反向引用的相关概念与使用方法,需要的朋友可以参考下
    2018-01-01
  • Spring如何更简单的读取和存储对象

    Spring如何更简单的读取和存储对象

    这篇文章主要给大家介绍了关于Spring如何更简单的读取和存储对象的相关资料,在Spring 中想要更简单的存储和读取对象的核⼼是使⽤注解,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-06-06
  • Java开发如何把数据库里的未付款订单改成已付款

    Java开发如何把数据库里的未付款订单改成已付款

    这篇文章主要介绍了Java开发如何把数据库里的未付款订单改成已付款,先介绍MD5算法,简单的来说,MD5能把任意大小、长度的数据转换成固定长度的一串字符,实现思路非常简单需要的朋友可以参考下
    2022-11-11

最新评论