首页 > 代码库 > jdk7 并行计算框架Fork/Join

jdk7 并行计算框架Fork/Join

故名思义,拆分fork+合并join。jdk1.7整合Fork/Join,性能上有大大提升。

思想:充分利用多核CPU把计算拆分成多个子任务,并行计算,提高CPU利用率大大减少运算时间。有点像,MapReduce思路感觉大致一样。

jdk7中已经提供了最简洁的接口,让你不需要太多时间关心并行时线程的通信,死锁问题,线程同步,下面是它提供的接口:

简单示例:

package tank.forjoin.demo;import java.util.concurrent.RecursiveTask;/**        * @author tank        * @date:2014-8-27 下午01:30:06        * @description:        * @version :任务        * futrue:        * forkjoinTask:recursiveTask recursiveAction        */public class Demo1 extends RecursiveTask<Integer> {    private int start;    private int end;    public Demo1(int start, int end) {        this.start = start;        this.end = end;    }    //计算    @Override    protected Integer compute() {        int sum = 0;        if (start - end < 100) {            for (int i = start; i < end; i++) {                sum += i;            }        } else {//间隔有100则拆分多个任务计算            int middle = (start + end) / 2;            Demo1 left = new Demo1(start, middle);            Demo1 right = new Demo1(middle + 1, end);            left.fork();            right.fork();            sum = left.join() + right.join();        }        return sum;    }}
package tank.forjoin.demo;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.Future;import org.junit.Test;/**        * @author tank        * @date:2014-8-27 下午01:50:20        * @description:        * @version :        */public class TestDemo1 {    @Test    public void testDemo1() throws InterruptedException, ExecutionException {        ForkJoinPool forkJoinPool = new ForkJoinPool();//对线程池的扩展        Future<Integer> result = forkJoinPool.submit(new Demo1(1, 10000));        System.out.println(result.get());
     
     forkJoinPool.shutdown(); }}

RecursiveAction 无返回值任务。

RecursiveTask有返回值类型。

ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。

 

jdk7 并行计算框架Fork/Join