首页 > 代码库 > ForkJoin 任务分解

ForkJoin 任务分解

Fork/Join框架的介绍

第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
 1 package com.thread.test.thread;
 2 
 3 import java.io.File;
 4 import java.util.ArrayList;
 5 import java.util.List;
 6 import java.util.concurrent.ForkJoinPool;
 7 import java.util.concurrent.RecursiveTask;
 8 
 9 /**
10  * Created by windwant on 2016/6/3.
11  */
12 public class MyForkJoin {
13 
14     public static void main(String[] args) {
15         MyTask task = new MyTask(new File("D:\\MPS"));
16         Integer sum = new ForkJoinPool().invoke(task);
17         System.out.println(sum);
18     }
19 }
20 
21 class MyTask extends RecursiveTask<Integer>{
22 
23     public Integer num = 0;
24 
25     private File file;
26     MyTask(File file){
27         this.file = file;
28     }
29 
30     @Override
31     protected Integer compute() {
32         List<MyTask> taskList = new ArrayList<MyTask>();
33         if(file.isDirectory()){
34             File[] list = file.listFiles();
35             for(File subf: list){
36                 if(subf.isDirectory()){
37                     MyTask mt = new MyTask(subf);
38                     taskList.add(mt);
39                 }else{
40                     num++;
41                 }
42             }
43         }else{
44             num = 1;
45         }
46 
47         if(!taskList.isEmpty()){
48             //同下
49 //            for(MyTask mtask: taskList){
50 //                mtask.fork();
51 //            }
52 //            for(MyTask mtask: taskList){
53 //                num += mtask.join();
54 //            }
55 
56             for(MyTask mtask: invokeAll(taskList)){
57                 num += mtask.join();
58             }
59         }
60         return num;
61     }
62 }

项目地址:https://github.com/windwant/threadtest

ForkJoin 任务分解