Java线程池中的Future实现详解

 更新时间:2023年10月26日 09:27:16   作者:buzhbuzh  
这篇文章主要介绍了Java线程池中的Future实现详解, FutureTask是一个任务,FutureTask继承了Runnable、Callable, 通过FutureTask可以获取到任务执行的状态,任务执行完成完成后,将结构通过Future接口返回,调用者可以调用Future#get()方法获取到数据,需要的朋友可以参考下

一 线程池的提交方式

在向线程池提交线程的时候,有两个方法, 一个是executor(Runnable runnable)方法, 另一个是submit(Runnable runnable)方法,

  Future<Integer> result = executor.submit(task); //有返回值
  executor.executor(task)  //没有返回值

二 Runnable、Callable、FutureTask的使用

(1) Runnable是一个任务的概念,这个任务没有返回值

(2) Callable也是一个任务,这个任务有返回值, 不能单独使用,必须配合FutureTask一起使用

(3) FutureTask是一个任务,FutureTask继承了Runnable、Callable, 通过FutureTask可以获取到任务执行的状态

(4) FutureTask任务执行完成完成后,将结构通过Future接口返回,调用者可以调用Future#get()方法获取到数据

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } 
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

三 FutureTask的实现

3.1  字段属性

(1) 任务的状态:

在这里插入图片描述

(2) 具体执行的任务和返回值、执行的线程、等待结果的线程队列

在这里插入图片描述

3.2  构造方法

构造方法的主要作用是将Runnable、Callable转换为FutureTask

在这里插入图片描述

3.3 任务的执行

任务执行

在这里插入图片描述

FutureTask#run() 方法

public void run() {
   // 如果状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回
   // 如果状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),如果赋值失败,则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
       // 获取构造函数传入的 Callable 值
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
               // 正常调用 Callable 的 call 方法就可以获取到返回值
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
               // 保存 call 方法抛出的异常
                setException(ex);
            }
            if (ran)
               // 保存 call 方法的执行结果
                set(result);
        }
    } finally {        
        runner = null;       
        int s = state;
       // 如果任务被中断,则执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

FutureTask中保存了Callable,在run()方法中调用Callable.call()方法就可以获取到返回值,然后通过 set(result) 保存正常程序运行结果,或通过 setException(ex) 保存程序异常信息

FutureTask#设置返回值方法

** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

// 保存异常结果
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

// 保存正常结果
protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
  }
}

setException() 和 set ()方法非常相似,都是将异常或者结果保存在 Object 类型的 outcome 变量中,outcome 是成员变量,就要考虑线程安全,所以他们要通过 CAS方式设置 outcome 变量的值,既然是在 CAS 成功后 更改 outcome 的值,这也就是 outcome 没有被 volatile 修饰的原因所在。

FutureTask#finishCompletion() 方法

这个方法是唤醒关注执行结果的线程,通知他们去获取任务的执行结果

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                   // 唤醒等待队列中的线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}  

3.4 任务的获取

get()方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
   // 如果 state 还没到 set outcome 结果的时候,则调用 awaitDone() 方法阻塞自己
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
   // 返回结果
    return report(s);
}

awaitDone(boolean timed, long nanos)阻塞线程并且加入阻塞队列中

// get 方法支持超时限制,如果没有传入超时时间,则接受的参数是 false 和 0L
// 有等待就会有队列排队或者可响应中断,从方法签名上看有 InterruptedException,说明该方法这是可以被中断的
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
   // 计算等待截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
       // 如果当前线程被中断,如果是,则在等待对立中删除该节点,并抛出 InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
       // 状态大于 COMPLETING 说明已经达到某个最终状态(正常结束/异常结束/取消)
       // 把 thread 只为空,并返回结果
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
       // 如果是COMPLETING 状态(中间状态),表示任务已结束,但 outcome 赋值还没结束,这时主动让出执行权,让其他线程优先执行(只是发出这个信号,至于是否别的线程执行一定会执行可是不一定的)
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
       // 等待节点为空
        else if (q == null)
           // 将当前线程构造节点
            q = new WaitNode();
       // 如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
       // 如果设置超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
           // 时间到,则不再等待结果
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
           // 阻塞等待特定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
           // 挂起当前线程,知道被其他线程唤醒
            LockSupport.park(this);
    }
}

进入这个方法会经历三个循环:

  • 第一轮for循环,执行的是q == null逻辑,这个时候会创建一个WaitNode节点
  • 第二轮for循环,执行的是!queued,这个时候会把第一轮循环生成的节点的next执行waiters,然后通过CAS的把节点q替换waiters,如果替换成功,第二轮结束
  • 第三轮for循环,执行阻塞,或者阻塞特定的时间设置返回结果(set方法)/异常的方法(setException)两个方法后,调用finishCompletion方法,这个方法会唤醒等待队列中的线程.

3.5 任务的取消

将一个任务修改为终态的只有三种方法: set()方法 setException()方法 cancel 方法

查看 Future cancel(),该方法注释上明确说明三种 cancel 操作一定失败的情形

  • 任务已经执行完成了
  • 任务已经被取消过了
  • 任务因为某种原因不能被取消

其它情况下,cancel操作将返回true。值得注意的是,cancel操作返回 true 并不代表任务真的就是被取消, 这取决于发动cancel状态时,任务所处的状态

  • 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
  • 如果发起cancel时任务已经在运行了,则这时就需要看 mayInterruptIfRunning 参数了:

如果mayInterruptIfRunning 为true, 则当前在执行的任务会被中断

如果mayInterruptIfRunning 为false, 则可以允许正在执行的任务继续运行,直到它执行完

public boolean cancel(boolean mayInterruptIfRunning) {
  
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
       // 需要中断任务执行线程
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
               // 中断线程
                if (t != null)
                    t.interrupt();
            } finally { // final state
               // 修改为最终状态 INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
       // 唤醒等待中的线程
        finishCompletion();
    }
    return true;
}

到此这篇关于Java线程池中的Future实现详解的文章就介绍到这了,更多相关Java的Future实现内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 关于Assert.assertEquals报错的问题及解决

    关于Assert.assertEquals报错的问题及解决

    这篇文章主要介绍了关于Assert.assertEquals报错的问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-05-05
  • springboot配置文件中属性变量引用方式@@解读

    springboot配置文件中属性变量引用方式@@解读

    这篇文章主要介绍了springboot配置文件中属性变量引用方式@@解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-04-04
  • springboot 整合邮件发送功能

    springboot 整合邮件发送功能

    这篇文章主要介绍了springboot 整合邮件发送功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-12-12
  • Java SpringBoot操作Redis

    Java SpringBoot操作Redis

    这篇文章主要介绍了SpringBoot如何操作Redis,文章中有详细的代码示例,有一定的参考价值,感兴趣的同学可以参考阅读
    2023-04-04
  • SpringBoot中GlobalExceptionHandler异常处理机制详细说明

    SpringBoot中GlobalExceptionHandler异常处理机制详细说明

    Spring Boot的GlobalExceptionHandler是一个全局异常处理器,用于捕获和处理应用程序中发生的所有异常,这篇文章主要给大家介绍了关于Java中GlobalExceptionHandler异常处理机制的相关资料,需要的朋友可以参考下
    2024-03-03
  • spring中使用@Autowired注解无法注入的情况及解决

    spring中使用@Autowired注解无法注入的情况及解决

    这篇文章主要介绍了spring中使用@Autowired注解无法注入的情况及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java实现Dbhelper支持大数据增删改

    Java实现Dbhelper支持大数据增删改

    这篇文章主要介绍了Java实现Dbhelper支持大数据增删改功能的实现过程,感兴趣的小伙伴们可以参考一下
    2016-01-01
  • Java项目工程代码深度刨析总结

    Java项目工程代码深度刨析总结

    一个项目工程里的代码是怎样的呢?对于初学者或者没有参与过项目的零经验同学这都是未知且让人好奇的,本篇文章带你一探究竟,踏入真实项目的大门
    2022-08-08
  • Java8新的异步编程方式CompletableFuture实现

    Java8新的异步编程方式CompletableFuture实现

    这篇文章主要介绍了Java8新的异步编程方式CompletableFuture实现,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-04-04
  • Java代码实现酒店管理系统

    Java代码实现酒店管理系统

    这篇文章主要为大家详细介绍了Java代码实现酒店管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05

最新评论