首页 > 代码库 > 并行编程框架 ForkJoin
并行编程框架 ForkJoin
本文假设您已经了解一般并行编程知识,了解Java concurrent部分如ExecutorService等相关内容。
虽说是Java的ForkJoin并行框架,但不要太在意Java,其中的思想在其它语言环境也是同样适用的。因为并发编程在本质上是一样的。就好像如何找到优秀的Ruby程序员?其实要找的只是一个优秀的程序员。当然,如果语言层面直接支持相关的语义会更好。
引言
Java 语言从一开始就支持线程和并发性语义。Java5增加的并发工具又解决了一般应用程序的并发需求,Java6、Java7又进一步补充了一些内容。原来的工具主要是粗粒度的并发。比如每个web请求由一个工作线程处理,在线程池分配任务。而Java7中新引入的ForkJoin可以处理更细粒度的并行计算。
早期的时候都是单核cpu环境,如果不是多核环境下,线程/进程并不是真正的并行执行,主要用来表示异步执行效果。单核cpu上,假如每个任务完全是cpu密集的(没有等待),那么这种伪并发并不会使计算变快。只有在真正的多核环境才能起到加速作用,而现在多核已经普及,甚至已经到了手机上!
介绍
ForkJoin是适用于多核环境的轻量级并行框架。目标是在多核系统下,通过并行运算,充分利用多处理器,提高效率与加速运行。
ForkJoin编程范式:将问题递归地分解为较小的子问题,并行处理这些子问题,然后合并结果,如:
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
ForkJoin由一组工作线程组成,用来执行任务,核心是work-stealing算法。可以有大量任务,但实际只有少量真正的物理线程,默认是机器的cpu数量,也可指定。很多其它工作的算法都可以在此基础之上进行。
虽然起初直接使用它的人可能不多,但将来会被很多框架在底层使用,因为是如此基础,所以最终ForkJoin可能会无处不在。
一般而言,使用者只需要关心两个方法fork() 和 join()。它们分别表示:子任务的异步执行和阻塞等待结果完成。
ForkJoin框架的核心是ForkJoinPool类,实现了work-stealing算法,用于执行ForkJoinTask类型的任务(也就是按照该算法调度线程与任务,当然还负责解决好相关的一些其它问题)。
work-stealing算法
work-stealing 是一种任务调度方法,由多个工作线程组成,每个工作线程用一个双端队列维护一组任务。Fork的时候是把任务加到队列的头部,而不像一般的线程池那样是加到任务队列末尾。工作线程选择头部最新的任务来执行。当工作线程没有任务可执行时,它会尝试从其它线程的任务队列尾部窃取一个任务执行。如果没有任务执行了并且窃取其它任务失败,那么工作线程停止。
这种方法的优点是减少了争用,因为工作线程从头获取任务,而窃取线程从尾部窃取任务。另一个优点是递归的分治法使得早期产生的是较大的任务单元,而窃取到较大任务会进一步递归分解,因此也减少了尾部窃取的次数。另外,父任务很可能要等待子任务(join),所以从队列头部子任务开始执行也是一种优化。
总之,它会使用有限的线程执行大量任务,同时保持各线程的任务都处于繁忙的执行状态,而尽量不让线程处于等待状态。为了做到这点可能会从其它线程的任务队列中窃取任务来执行,所以叫work-stealing。
就像前面所说物理线程不能太多,过多的话切换管理开销就会较大,还会消耗更多的内存等资源,并且没有带来任何好处。默认是用cpu数量的线程数,一般情况都比较合适(比如Runtime.getRuntime().availableProcessors()返回处理器的数量),但具体的数值还和任务自身的特点有关,可以通过不同参数测试比较一下。而任务可以是大量的,由每个线程的工作队列维护。
ForkJoin是简化了一些开发者的工作,如果不用ForkJoin,最原始的方式是自己手工切分任务并分别创建线程执行。
分治、并行、可伸缩的思考:
这三者关系很亲密。分治思想(divide-and-conquer)是一种简单朴素的思想,很多问题都可以这样解决。ForkJoin就相当于分治法的并行版本。 分治本身只是解决问题的思想,既可以顺序执行也可以并行执行,但是在并行环境中更加有效,因为可以并行处理子问题。而在并行方面,可并行处理问题要么是彼此完全独立的问题,要么是可分解单独处理的问题。可伸缩性又和能否并行处理紧密相关,因为如果不能并行处理就要受到单机处理能力的限制,也就难以伸缩了。
ForkJoin与MapReduce两个并行计算框架的区别 ?
MapReduce是把大数据集切分成小数据集,并行分布计算后再合并。
ForkJoin是将一个问题递归分解成子问题,再将子问题并行运算后合并结果。
二者共同点:都是用于执行并行任务的。基本思想都是把问题分解为一个个子问题分别计算,再合并结果。应该说并行计算都是这种思想,彼此独立的或可分解的。从名字上看Fork和Map都有切分的意思,Join和Reduce都有合并的意思,比较类似。
区别:
1)环境差异,分布式 vs 单机多核:ForkJoin设计初衷针对单机多核(处理器数量很多的情况)。MapReduce一开始就明确是针对很多机器组成的集群环境的。也就是说一个是想充分利用多处理器,而另一个是想充分利用很多机器做分布式计算。这是两种不同的的应用场景,有很多差异,因此在细的编程模式方面有很多不同。
2)编程差异:MapReduce一般是:做较大粒度的切分,一开始就先切分好任务然后再执行,并且彼此间在最后合并之前不需要通信。这样可伸缩性更好,适合解决巨大的问题,但限制也更多。ForkJoin可以是较小粒度的切分,任务自己知道该如何切分自己,递归地切分到一组合适大小的子任务来执行,因为是一个JVM内,所以彼此间通信是很容易的,更像是传统编程方式。
ForkJoin框架基本结构:
ForkJoinPool本身实现了ExecutorService接口,负责调度执行ForkJoinTask。
ForkJoinTask是提交给ForkJoinPool 执行的任务,本身也实现了Future 接口。
ForkJoinTask有两个子类RecursiveAction和RecursiveTask。 RecursiveAction 没有返回值(只需fork);RecursiveTask有返回值(需要合并)。类似于Runnable和 Callable一样。没有返回值一般意味着所有子任务都执行完了即可,中间的子任务不需要join了。其实要不要返回值都可以实现,有返回值可以直接合并,没有返回值可以把结果保存在共享的数据上。
而我们要做的是实现自己要完成的任务,只需要继承其一,并覆盖抽象方法compute()。在这个方法中实现自己的任务,递归分解任务。
ForkJoinPool与一般的ExecutorService实现的差别?
ForkJoin实现了ExecutorService接口,这个接口就是用来把任务交给线程池中的工作线程去执行。ForkJoin也是一个ExecutorService,但区别在于ForkJoin使用了work-stealing算法,见前面的介绍。普通的线程池是按FIFO的方式执行,而ForkJoin优先执行(由其它任务)后创建子任务。对于大部分会产生子任务的任务模式,ForkJoin的处理实现会很高效。如果设置了异步模式, ForkJoin也可能适合执行事件类型(不需要join)的任务。
影响ForkJoin加速效果的因素
理想效果是核越多加速效果越好。但是并行不一定更快,参数不对还可能更慢:
1) 并发数,即线程数。一般是可用的cpu数,默认就是这个,一般表现很好。
2) 任务切分的粒度。如果切分粒度等于总任务量,一个任务执行,就相当于单线程顺序执行。每个任务执行的计算量,太大的话加速效果有限,不能发挥到最好。相反,太小的话,消耗在任务管理的成本占了主要部分,导致还不如顺序执行的快。
需要适当平衡二者。因为还和任务本身的特定有关,所以可以做个基准测试比较一下。
而总的执行时间还与任务的规模有关。
任务粒度应该适中,多大合适?好像在什么地方上看到说:经验上单个任务为100-10000个基本指令,当然还和任务本身的特定有关。
个人感觉多核cpu只适用于解决计算密集型应用,因为实际问题可能IO等其他方面的瓶颈,多核也还是无法充分利用的。
使用ForkJoin的步骤:
ForkJoin框架替我们完成了一些工作,那么我们使用时还要完成哪些工作:
1) 如何执行单个任务。如果只切分出一个任务执行,就相当于单线程顺序执行。
2) 如何递归地切分任务(以及任务切分后是否需要合并结果)
3) 切分粒度多少合适(最小任务单元)
这些具体表现在:继承ForkJoinTask的一个子类,并实现抽象方法compute()。在这个方法中实现自己的任务,递归分解任务。
这些准备好之后就可以启动了:创建一个表示全部任务的ForkJoinTask对象,创建一个ForkJoinPool的实例,把task作为参数执行ForkJoinPool的invoke方法。
在ForkJoin任务外部执行总任务:execute异步执行任务,没有返回结果void;invoke执行任务并等待返回结果,结果是特定类型;submit执行一个任务,返回ForkJoinTask(实际上是作为Future对象返回)。一般应该在外部使用invoke调用执行总任务。而execute和submit只是为了实现ExecutorService规定的相关语义,invoke是ForkJoin中特有的。
在ForkJoinTask内部递归执行的过程中:fork是异步执行,invoke是等待任务执行完成。
具体实例:
多看看具体示例比较好。
1) 合并排序示例:
合并排序是常见的排序算法之一。示例实现了对一个整数数组的合并排序。同时还演示了不同并发数(线程数)与不同数组大小的组合测试。代码在<jdk_ home>/sample/ForkJoin/ 中。
2) 把图片模糊处理示例:
一个图片可以被表示为一个m*n大小的整数数组,其中每个整数表示一个像素(的颜色)。模糊处理之后的图像还是一个同样大小的整数数组。处理过程是把原来的每个像素与周围像素的颜色求平均值即可。如果顺序执行就是从头到尾对每个像素执行一次计算得到目标像素,因为每个像素的计算是独立的,所以可以把这个整数数组切分成一块一块的子数组(即子任务)分别执行。任务不适合切分的过小,所以设定了一个常数阈值10000,大小小于10000的子数组就直接执行,否则对半切分为两个子数组的任务分别执行。文章 源代码
在我自己的机器上i3处理器 (i3,4cpu),并行确实快了不少。
其它的例如:求最大值max、平均值avg、求和sum等聚合函数都是可以分解计算的。
示例中都是对数组的处理,比较常见的是对数组、集合进行并行地处理操作,但也不限于此。
网上有一些Fibonacci 的示例,但这些示例并不适合展示ForkJoin。
Doug Lea与JSR-166
说到Java并发编程,就不能不说到Doug Lea与JSR-166。Doug Lea是并发编程方面的专家,纽约州立大学奥斯威戈分校的计算机教授。曾是JCP执行委员,是JSR-166的leader。JSR-166就是负责向Java语言中添加并发编程工具的,即我们见到的java.util.concurrent包(及子包)。还是《 Concurrent Programming in Java Design Principles and Patterns》一书的作者,是这方面最早的书。他还是知名的内存分配方法dlmalloc的作者,这是C语言中的动态内存分配函数malloc的一种普遍使用的实现。
参考资料:
- Java api 及前面提到的两个自带示例。
- A Java Fork/Join Framework (pdf)
- Java 理论与实践: 应用 fork-join 框架
- Java 理论与实践: 应用 fork-join 框架,第 2 部分
- JDK 7 中的 Fork/Join 模式
- Java Fork/Join for Parallel Programming
- Fork-Join Development in Java? SE
- Java Fork/Join + Groovy
- InfoQ上ForkJoin相关文章
其它并行框架:
- Akka
- GPars