java利用CompletionService保证任务先完成先获取到执行结果

 更新时间:2023年08月17日 08:22:37   作者:Shawn_Shawn  
这篇文章主要为大家详细介绍了java如何利用CompletionService来保证任务先完成先获取到执行结果,文中的示例代码讲解详细,需要的可以参考下

CompletionService简介

在学习future的时候,我们提到,future.get()方法会阻塞线程,所以如果A,B,C三个线程同时获取执行结果,如果A先执行,但是A的执行时间很长,那么即使B,C执行很短,也无法获取到B,C的执行结果,因为主线程阻塞在A.get()上了。

ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (Future future:futures) {
	Integer result = future.get();
	// 其他业务逻辑 如果A执行时间很长,阻塞
}

那么如何让B,C也有机会能够获取到执行结果呢?答案就是java.util.concurrent.CompletionService

CompletionService是Java8的新增接口,JDK为其提供了一个实现类ExecutorCompletionService。这个类是为线程池中Task的执行结果服务的,即为ExecutorTask返回Future而服务的。CompletionService的实现目标是任务先完成可优先获取到,即结果按照完成先后顺序排序。

ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService 是 CompletionService 唯一实现类
CompletionService completionService = new ExecutorCompletionService<>(executorService );
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(completionService.submit(A));
futures.add(completionService.submit(B));
futures.add(completionService.submit(C));
// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (int i = 0; i < futures.size(); i++) {
    Integer result = completionService.take().get();
    // 其他业务逻辑
}

CompletionService原理

我们来试想一下,如果是你应该如何解决上述Feture带来的阻塞问题呢?可以通过阻塞队列来实现,伪代码如下:

// 创建阻塞队列
BlockingQueue<Integer> bq =
  new LinkedBlockingQueue<>();
// 任务A 异步进入阻塞队列  
executor.execute(() -> bq.put(A.get()));
// 任务B 异步进入阻塞队列  
executor.execute(() -> bq.put(B.get()));
// 任务C 异步进入阻塞队列  
executor.execute(()-> bq.put(C.get()));
for (int i = 0; i < 3; i++) {
  Integer r = bq.take();
  // 异步执行所有业务逻辑
  executor.execute(()->action(r));
}

实际上CompletionService的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是CompletionService是把任务执行结果的Future对象加入到阻塞队列中。

CompletionService是一个接口,submit()用于提交任务,take()和poll()用于从阻塞队列中获取并移除一个元素,它们的区别在于如果阻塞队列是空的,那么调用take()方法的线程就会被阻塞,而poll()方法会返回null值。

public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

其实现类ExecutorCompletionService,实际上可以看做是ExecutorBlockingQueue的结合体,ExecutorCompletionService把具体的计算任务交给 Executor完成,通过BlockingQueuetake()方法获得任务执行的结果。

ExecutorCompletionService有两个构造函数

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    // 判断executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool
    // 其余框架也有实现了AbstractExecutorService抽象类,目前JDK里只有上述的三种实现
    // 如果不是,则为null
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    // 判断executor是不是ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool
    // 其余框架也有实现了AbstractExecutorService抽象类,目前JDK里只有上述的三种实现
    // 如果不是,则为null
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

两个构造器都需要传入Executor,如果不传BlockingQueue<Futrue>,默认会创建一个LinkedBlockingQueue<Future<V>>的队列,该BlockingQueue的作用是保存Executor执行的结果。

submit()源码如下:

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture<V>(f, completionQueue));
    return f;
}
public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture<V>(f, completionQueue));
    return f;
}

当提交一个任务到ExecutorCompletionService时,首先需要将task封装成RunableFuture<V>,通过newTaskFor()完成,然后再将RunableFuture封装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTaskdone方法,之后把Executor执行的计算结果放入BlockingQueue中。

newTaskFor()的源码如下:

private RunnableFuture<V> newTaskFor(Callable<V> task) {
    // aes是AbstractExecutorService,其实现类是ThreadPoolExecutor,ForkJoinPool,SchedulerThreadPoolExecutor
    if (aes == null) 
        return new FutureTask<V>(task);
    else
        return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    if (aes == null)
        return new FutureTask<V>(task, result);
    else
        return aes.newTaskFor(task, result);
}

QueueingFuture的源码如下:

private static class QueueingFuture<V> extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task,
                   BlockingQueue<Future<V>> completionQueue) {
        super(task, null);
        this.task = task;
        this.completionQueue = completionQueue;
    }
    private final Future<V> task;
    private final BlockingQueue<Future<V>> completionQueue;
    // 会被java.util.concurrent.FutureTask#finishCompletion调用,判读是否计算完成
    // 计算结果放在阻塞队列中
    protected void done() { completionQueue.add(task); }
}

take()poll()方法如下:

// 从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会阻塞,直到有任务完成返回结果。
public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}
// 从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞。
public Future<V> poll() {
    return completionQueue.poll();
}
// 从结果队列中获取并移除一个已经执行完成的任务的结果,如果没有就会返回null,该方法不会阻塞。
// 超时
public Future<V> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
    return completionQueue.poll(timeout, unit);
}

以上就是java利用CompletionService保证任务先完成先获取到执行结果的详细内容,更多关于java CompletionService的资料请关注脚本之家其它相关文章!

相关文章

  • SpringBoot使用jsoup爬取HTML的方法

    SpringBoot使用jsoup爬取HTML的方法

    jsoup 是一款 Java 的 HTML 解析器,它提供了一套非常便利的 API,可通过 DOM、CSS 通过类似于 JQuery 的操作方法来取出和操作数据,这篇文章主要介绍了SpringBoot使用jsoup爬取HTML,需要的朋友可以参考下
    2024-02-02
  • Java字节流和字符流及IO流的总结

    Java字节流和字符流及IO流的总结

    本文主要将Java中的IO流进行了梳理,通过将其分成字节流和字符流,以及输入流和输出流分别统计,来建立一个对 Java中IO流全局的概念,通过一些实例来演示了如何通过不同类型的流来组合实现强大灵活的输入和输出,最后介绍了同时支持输入和输出的 RandomAccessFile。
    2021-04-04
  • 基于Java信号量解决死锁过程解析

    基于Java信号量解决死锁过程解析

    这篇文章主要介绍了基于Java信号量解决死锁过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-10-10
  • Java WeakHashMap案例详解

    Java WeakHashMap案例详解

    这篇文章主要介绍了Java WeakHashMap案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • SpringBoot+MyBatisPlus+MySQL8实现树形结构查询

    SpringBoot+MyBatisPlus+MySQL8实现树形结构查询

    这篇文章主要为大家详细介绍了SpringBoot+MyBatisPlus+MySQL8实现树形结构查询,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-06-06
  • 谈谈Java中自定义注解及使用场景

    谈谈Java中自定义注解及使用场景

    这篇文章主要介绍了谈谈Java中自定义注解及使用场景,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Springboot 整合 RabbitMQ 消息队列 详情

    Springboot 整合 RabbitMQ 消息队列 详情

    这篇文章主要介绍了Springboot整合RabbitMQ 消息队列详情,文章为荣啊主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-08-08
  • Java实现的简单网页截屏功能示例

    Java实现的简单网页截屏功能示例

    这篇文章主要介绍了Java实现的简单网页截屏功能,涉及java网页打开及屏幕截图功能相关操作技巧,需要的朋友可以参考下
    2017-12-12
  • 学习Java的Date、Calendar日期操作

    学习Java的Date、Calendar日期操作

    Java开发过程中避免不了日期相关操作,这篇文章总结了一些Date、Calendar的常用方法,需要的朋友可以参考下
    2015-07-07
  • 简单了解JAVA内存区域效果知识

    简单了解JAVA内存区域效果知识

    这篇文章主要介绍了简单了解JAVA内存区域效果知识,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10

最新评论