首页 > 代码库 > 多线程IO操作(fork-join版)

多线程IO操作(fork-join版)

 接着上篇中没写完的(http://my.oschina.net/bluesroot/blog/223453),上篇中讲到很多,为完成对一个目录的扫描的频繁的IO操作,我们从单线程到多线程,从CountDownLatch到BlockingQueue,中间不免各种Callable和Future和ExecutorService等等,虽然纷繁,中间有些不免麻烦,但是最终仍紧紧贴着我们的需求和多线程操作这一主题。

    

        随着jdk的版本升级,并发包也随之扩充了不少,上面博文中的API来源于jdk1.6,1.7的推出和新的api的新增,我们有了更安全更方便更高效的方式去解决上面的问题。


        1.5的时候,我们更多的使用synchronized,并辅助其他的api的去操作多线程,线程的理解不深刻,这样的做法很多时候是很危险的;1.6到了,官方推荐使用线程池,线程池成熟且方便的为大家省去了很多不必要的麻烦,如今到了1.7的时候了,我们发觉它愈发高效。


        最常用的,我们当然能使用ExecutorService来管理并调度线程池中的线程来执行任务。然后问题很分明,设计该线程的时候,我们该如何设置线程的数量呢?多了怕浪费,少了又不高效,好歹有个线程计算公式(线程数=cpu核心数 /(1-阻塞系数),阻塞系数在0-1之间取值),虽然不是绝对正确,却也能有参考。在java7中,针对这一情况,专门加入了针对ExecutorService效率和性能的改进版的fork-join API。


        ForkJoinPool类可以根据可用的处理器数量和任务需求,动态的对线程进行管理。(真心高端...)Fork-join使用了work-stealing策略,即本线程在完成自己的任务之后,在发现其他的线程还有活没做完,则主动帮助其他的线程一起干。该策略的使用,不但提升了API的性能,而且有助于提高线程的利用率。(妈妈再也不用担心,我开多少线程大小的线程池了~~)


        在这个例子中,为了更好的调度任务,我们提供一些ForkJoinTask的实例来配合ForkJoinPool的函数的使用。我们用ForkJoinTask来创建(fork任务),然后再将主线程join到任务的完成点上。这样就行了。

        ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。前者的子类主要用来执行不需要返回值的任务(是不是跟Runnable的说法类似?),而后者的子类,主要用于执行需要返回值的任务。(对了,跟Future的说法类似的)

        Fork-join API主要用于处理那些有些规模合理的任务,即如果每个任务所分担的开销都不大(也没有不确定的循环),则该API就可以达到合理的吞吐量。此外,Fork-join的API希望所有的任务都不要有副作用(即不要改变共享状态),并且没有同步或者锁操作,而本例正是如此,所以,这里使用它,能大大提高效率和代码简洁。它的场景有限,大家切不可以为,它能代替谁,它的出现是为了简化和解决某些情况,而并非取代某些老牌的并发API,ExecutorService表示毫无压力..........


        废话太多了,我们结合上篇文章中的场景需求,看看如何使用jdk7的新api,优雅且高效的解决该问题,代码如下:

01public class FileSize {
02   
03  private final static ForkJoinPool forkJoinPool = new ForkJoinPool();
04   
05  private static class FileSizeFinder extends RecursiveTask<Long> {
06    final File file;
07     
08    public FileSizeFinder(final File theFile) {
09      file = theFile;
10    }
11     
12    @Override public Long compute() {
13      long size = 0;
14      if (file.isFile()) {
15        size = file.length();
16      else {
17        final File[] children = file.listFiles();
18        if (children != null) {
19          List<ForkJoinTask<Long>> tasks = 
20            new ArrayList<ForkJoinTask<Long>>();
21          for(final File child : children) {
22            if (child.isFile()) {
23              size += child.length();
24            else {
25              tasks.add(new FileSizeFinder(child));
26            }
27          }
28           
29          for(final ForkJoinTask<Long> task : invokeAll(tasks)) {
30            size += task.join();
31          }
32        }
33      }
34       
35      return size;
36    }
37  }
38 
39  public static void main(final String[] args) {
40    final long start = System.nanoTime();
41    final long total = forkJoinPool.invoke(
42        new FileSizeFinder(new File("D:/tools")));
43    final long end = System.nanoTime();
44    System.out.println("Total Size: " + total);
45    System.out.println("Time taken: " + (end - start)/1.0e9);    
46  }
47}

        上述tools文件夹,放了tomcat,maven,spring的包和maven的仓库,各种子目录很多,大小是543M,cpu i5,4g内存,运行结果为:

1Total Size: 570332850
2Time taken: 0.697063339


        上述代码中,我们创建了ForkJoinPool实例的引用,用static修饰,即他在整个应用程序中共享。随后定义一个名为FileSizeFinder的静态内部类,它继承RecursiveTask类,并实现了compute(),该方法可以用来作为任务的执行引擎。invokeAll()函数将等待所有的子任务完成之后,才会去执行下一步循环累加操作。在任务被阻塞期间,其执行线程并非什么也不做,而是可以被调度去做其他的任务,最后每个任务都将compute函数结束时返回计算的结果。


        本例中,我们递归的将扫描任务进行分解,直到不能拆分。但一般来说,拆分粒度太细会显著增加线程调度的开销,所以我们不建议将问题拆分的过小。

        java.until.concurrent包中,定义了很多线程安全的集合类,这个集合类既保证了并发编程环境下的数据安全性,同时也可以被当做同步点来使用。线程安全很重要,但是我们不想为此牺牲太多的性能。后面我们继续探讨concurrent中与并发应用性能息息相关的一些数据结构。