首页 > 代码库 > FutureTask与Fork/Join

FutureTask与Fork/Join

  在学习多线程的过程中,我们形成了一种思维习惯。那就是对于某个耗时操作不再做同步操作,让他分裂成一个线程之后执行下一步,而线程执行耗时操作。并且我们希望在我们需要它返回的时候再去调用它的结果集。好比我们把米饭和水放进了电饭煲,转头就去炒菜了,等到菜完成之后,转头去查看饭是否完成。多线程造成了并行计算的现象,有时候它们是真的多核计算而有时候只是单核的切换。

  FutureTask表示的是一种,异步操作的典范。我提交了任务,在未来我要拿到结果。

  考虑一种简单的场景,A问B一个问题,B一时回答不了,B要去考虑一段时间,等到有结果了,再告诉A。

  这时,我们需要类A,类B。

package Future;

//调用端
public class CallA implements CallBack{
    private CallB b;
    public CallA(CallB b){
        this.b = b;
    }
    
    public void submitProblem(String problem){
        System.out.println("a 提交问题:"+problem);
        new Thread(){
            public void run(){
                b.execute(CallA.this, problem);
            }
        }.start();
        System.out.println("a 提交问题完毕");
    }
    
    @Override
    public void result(String result) {
        System.out.println(result);
    }
    
}
package Future;

//执行处理
public class CallB {
	public void execute(CallBack callBack,String problem){
		System.out.println("接受问题:"+problem);
		System.out.println("开始处理");
		try{
			Thread.sleep(2000);
		}catch (Exception e) {
			e.printStackTrace();
		}
		callBack.result("问题处理结果:abcdefg...");
	}
}

  类的结构是,A中保留它作用对象B的一个引用,在触发询问问题的时候,A向B提交了一个方法调用,并且同时开启了一个线程,这是它不阻塞的原因。

  为“提问题”做一个面向对象的接口。

//回调接口
public interface CallBack {
	public void result(String result);
}	

  他们执行的主流程,十分简单。

public class Main {
    public static void main(String[] args) {
        CallB b = new CallB();
        CallA a = new CallA(b);
        a.submitProblem("英文字母");
    }
}

  熟悉了这个过程,JDK提供了FutureTask的接口。

package Future;

import java.util.concurrent.Callable;

public class RealData implements Callable<String>{
    private String data;
    public RealData(String data){
        this.data =http://www.mamicode.com/ data;
    }
    @Override
    public String call() throws Exception {
        StringBuffer sb = new StringBuffer();
        for(int i=0;i<10;i++){
            sb.append(data);
            try{
                Thread.sleep(1500);
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        return sb.toString();
    }

}

  实现这个Callable接口,重写call方法,在未来调用get的时候将返回运算结果。

package Future;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

//jdk future框架
public class FutureMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> future = new FutureTask<String>(new RealData("a"));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.submit(future);
        System.out.println("请求完毕");
        try{
            Thread.sleep(1000);
        }catch (Exception e) {
        }
        System.out.println("future task 返回:"+future.get());
    }
}

   多线程的优势体现在并行计算中,虽然某大佬说研究并行计算是在浪费时间,但是作为一种由多线程产生的技术来说,先了解一下特点。

   JDK为我们提供了一套Join/Fork框架,考虑下面这个例子。

package ForkAndJoin;

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction{
    private final int Max = 50;
    private int start;
    private int end;
    public PrintTask(int start,int end){
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute() {
        if((end - start)<Max){
            for(int i=start;i<end;i++){
                System.out.println("当前线程:"+Thread.currentThread().getName()+" i :"+i);
            }
        }else{
            int middle = (start+end)/2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            left.fork();
            right.fork();
        }
    }

}
package ForkAndJoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolTest {
    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool forkJoin = new ForkJoinPool();
        forkJoin.submit(new PrintTask(0,200));
        forkJoin.awaitTermination(2, TimeUnit.SECONDS);
        forkJoin.shutdown(); 
    }
}

   在compute方法中写主要的任务处理,这是一个并行计算的小例子。

   J/F的模式很像map-reduce模式,将任务分小,然后各个处理。

  

FutureTask与Fork/Join