Java7之forkjoin简介_动力节点Java学院整理

 更新时间:2017年06月01日 09:48:30   投稿:mrr  
Java7引入了Fork Join的概念,来更好的支持并行运算。接下来通过本文给大家分享Java7之forkjoin简介,感兴趣的朋友一起看看吧

Java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。

有两个核心类ForkJoinPool和ForkJoinTask。

ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:

<SPAN style='FONT-SIZE: 9pt;"微软雅黑","sans-serif"; COLOR: #333333;"BORDER-TOP: windowtext 1pt solid; BORDER-RIGHT: windowtext 1pt solid; BORDER-BOTTOM: windowtext 1pt solid; PADDING-BOTTOM: 0cm; PADDING-TOP: 0cm; PADDING-LEFT: 0cm; BORDER-LEFT: windowtext 1pt solid; PADDING-RIGHT: 0cm; BACKGROUND-COLOR: transparent;"> <P style="BACKGROUND: white; TEXT-ALIGN: left; LINE-HEIGHT: normal;" align=left><SPAN style='FONT-SIZE: 9pt;"微软雅黑","sans-serif"; COLOR: #333333;

ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。

首先,用户需要创建一个自己的ForkJoinTask。代码如下:

public class MyForkJoinTask extends ForkJoinTask {
 
 /**
  *
  */
 private static final long serialVersionUID = 1L;
 private V value;
 private boolean success = false;
 @Override
 public V getRawResult() {
  return value;
 }
 @Override
 protected void setRawResult(V value) {
  this.value = value;
 }
 @Override
 protected boolean exec() {
  System.out.println("exec");
  return this.success;
 }
 public boolean isSuccess() {
  return success;
 }
 public void setSuccess(boolean isSuccess) {
  this.success = isSuccess;
 }
}

测试ForkJoinPool.invoke(…):

 @Test
 public void testForkJoinInvoke() throws InterruptedException, ExecutionException {
  ForkJoinPool forkJoinPool = new ForkJoinPool();
  MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true);
  task.setRawResult("test");
  String invokeResult = forkJoinPool.invoke(task);
  assertEquals(invokeResult, "test");
 }
 @Test
 public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  new Thread(new Runnable() {
   public void run() {
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    task.complete("test");
   }
  }).start();
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  String result = forkJoinPool.invoke(task);
  System.out.println(result);
 }
 @Test
 public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
  ForkJoinTask result = forkJoinPool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }

测试ForkJoinPool.submit(…):

@Test
 public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
  ForkJoinTask result = forkJoinPool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }
 @Test
 public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  forkJoinPool.submit(task);
  Thread.sleep(1000);
 }
 @Test
 public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  new Thread(new Runnable() {
   public void run() {
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    task.complete("test");
   }
  }).start();
  ForkJoinTask result = forkJoinPool.submit(task);
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  result.get();
  Thread.sleep(1000);
 }

测试ForkJoinPool.execute(…):

 @Test
 public void testForkJoinExecute() throws InterruptedException, ExecutionException {
  ForkJoinPool forkJoinPool = new ForkJoinPool();
  MyForkJoinTask task = new MyForkJoinTask();
  forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。
 }

在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。

分支/合并的完整过程如下: 

下面列举一个分治算法的实例。

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MaximumFinder extends RecursiveTask<Integer> {
 private static final int SEQUENTIAL_THRESHOLD = 5;
 private final int[] data;
 private final int start;
 private final int end;
 public MaximumFinder(int[] data, int start, int end) {
 this.data = data;
 this.start = start;
 this.end = end;
 }
 public MaximumFinder(int[] data) {
 this(data, 0, data.length);
 }
 @Override
 protected Integer compute() {
 final int length = end - start;
 if (length < SEQUENTIAL_THRESHOLD) {
  return computeDirectly();
 }
 final int split = length / 2;
 final MaximumFinder left = new MaximumFinder(data, start, start + split);
 left.fork();
 final MaximumFinder right = new MaximumFinder(data, start + split, end);
 return Math.max(right.compute(), left.join());
 }
 private Integer computeDirectly() {
 System.out.println(Thread.currentThread() + ' computing: ' + start
      + ' to ' + end);
 int max = Integer.MIN_VALUE;
 for (int i = start; i < end; i++) {
  if (data[i] > max) {
  max = data[i];
  }
 }
 return max;
 }
 public static void main(String[] args) {
 // create a random data set
 final int[] data = new int[1000];
 final Random random = new Random();
 for (int i = 0; i < data.length; i++) {
  data[i] = random.nextInt(100);
 }
 // submit the task to the pool
 final ForkJoinPool pool = new ForkJoinPool(4);
 final MaximumFinder finder = new MaximumFinder(data);
 System.out.println(pool.invoke(finder));
 }
}

以上所示是小编给大家介绍的Java7之forkjoin简介_动力节点Java学院整理,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的,在此也非常感谢大家对脚本之家网站的支持!

相关文章

  • SpringBoot集成Redisson实现分布式锁的方法示例

    SpringBoot集成Redisson实现分布式锁的方法示例

    这篇文章主要介绍了SpringBoot集成Redisson实现分布式锁的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-10-10
  • Spring Boot 中的 SockJS原理及使用方法

    Spring Boot 中的 SockJS原理及使用方法

    SockJS 的主要作用是提供一种 WebSocket 的兼容性解决方案,使得不支持 WebSocket 的浏览器也可以使用 WebSocket,本文介绍了Spring Boot中的SockJS,包括SockJS的原理,使用方法和示例代码,感兴趣的朋友跟随小编一起看看吧
    2023-07-07
  • Java基础学习之方法的重载知识总结

    Java基础学习之方法的重载知识总结

    今天带大家来回顾Java基础知识,文中对Java方法的重载相关知识作了非常详细的介绍,对正在学习java的小伙伴们有很好的帮助,需要的朋友可以参考下
    2021-05-05
  • Java代码性能测试实战之ContiPerf安装使用

    Java代码性能测试实战之ContiPerf安装使用

    这篇文章主要为大家介绍了Java代码性能测试实战之ContiPerf安装使用,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-06-06
  • SpringMVC和Spring的配置文件扫描包详解

    SpringMVC和Spring的配置文件扫描包详解

    这篇文章主要介绍了SpringMVC和Spring的配置文件扫描包,本文给大家介绍的非常详细,具有一定的参考借鉴价值 ,需要的朋友可以参考下
    2019-05-05
  • Java优化for循环嵌套的高效率方法

    Java优化for循环嵌套的高效率方法

    这篇文章主要介绍了Java优化for循环嵌套的高效率方法,帮助大家更好的提升java程序性能,感兴趣的朋友可以了解下
    2020-09-09
  • 使用jib插件为Java应用构建镜像的方法

    使用jib插件为Java应用构建镜像的方法

    这篇文章主要介绍了使用jib插件为Java应用构建镜像,要是用户本地没安装docker,可以使用jib制作出带有镜像的tar文件,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-08-08
  • Java快速排序与归并排序及基数排序图解示例

    Java快速排序与归并排序及基数排序图解示例

    快速排序是基于二分的思想,对冒泡排序的一种改进。主要思想是确立一个基数,将小于基数的数放到基数左边,大于基数的数字放到基数的右边,然后在对这两部分进一步排序,从而实现对数组的排序
    2022-09-09
  • SpringBoot实现列表数据导出为Excel文件

    SpringBoot实现列表数据导出为Excel文件

    这篇文章主要为大家详细介绍了在Spring Boot框架中如何将列表数据导出为Excel文件,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下
    2024-02-02
  • java 实现发短信功能---腾讯云短信

    java 实现发短信功能---腾讯云短信

    如今发短信功能已经成为互联网公司的标配,接下来通过本文给大家介绍java 实现发短信功能---腾讯云短信 ,需要的朋友可以参考下
    2019-08-08

最新评论