首页 > 代码库 > Hama学习总结

Hama学习总结

Hama学习笔记

1.       Hama定义

Hama是基于HDFS上的BSP模型实现,其运行不需要MapReduce。例证如下: 在单点调试的Hama系统上,只运行NameNode、DataNode、BSPMasterRunner、GroomServerRunner和 ZooKeeperRunner进程,即可运行PageRank程序。

2.      MapReduce与BSP区别

执行机制:MapReduce是一个数据流模型,每个任务只是对输入数据进行处理,产生的输出数据作为另一个任务的输入数据,并行任务之间独立地进行,串行任务之间以磁盘和数据复制作为交换介质和接口。

BSP是一个状态模型,各个子任务在本地的子图数据上进行计算、通信、修改图的状态等操作,并行任务之间通过消息通信交流中间计算结果,不需要像MapReduce那样对全体数据进行复制。

迭代处理:MapReduce模型理论上需要连续启动若干作业才可以完成图的迭代处理,相邻作业之间通过分布式文件系统交换全部数据。BSP模型仅启动一个作业,利用多个超步就可以完成迭代处理,两次迭代之间通过消息传递中间计算结果。由于减少了作业启动、调度开销和磁盘存取开销,BSP模型的迭代执行效率较高。

数据分割:基于BSP的图处理模型,需要对加载后的图数据进行一次再分布的过程,以确定消息通信时的路由地址。例如,各任务并行加载数据过程中,根据一定的映射策略,将读入的数据重新分发到对应的计算任务上(通常是放在内存中),既有磁盘I/O又有网络通信,开销很大。但是一个BSP作业仅需一次数据分割,在之后的迭代计算过程中除了消息通信之外,不再需要进行数据的迁移。而基于MapReduce的图处理模型,一般情况下,不需要专门的数据分割处理。但是Map阶段和Reduce阶段次年在中间结果的Shuffle过程,增加了磁盘I/O和网络通信开销。

总结MapReduce发送数据+消息,而Hama只发送消息。在Hama的超步迭代过程中,当某个BSPPeer收到其他BSPPeer发送过来的某顶点的消息,进行消息处理,而后要把处理结果发送到该节点的邻接节点,因此该节点的数据信息也必须存在该BSPPeer中,故必须在对数据加载到内存时进行一次Hash再分布。

 

下面分析Hama中数据再分布的机制,源码位于GraphJobRunner.loadVertices()方法中。首先获取每个BSPPeer的数据分片大小splitSize,举例如下表1所示:

表 1  BSPPeer数据量信息

Peer序号

BSPPeer1

BSPPeer2

BSPPeer3

数据量

62M

64M

54M

GraphJobRunner.partitionMultiSteps(BSPPeer,splitSize)方法中,每个BSPPeer把自己的splitSize发送给MasterPeer。

进行同步后,在MasterPeer上找到最大所有BSPPeer上最大的splitSize赋值给maxSplitSize,即maxSplitSize等于BSPPeer2上的64M。然后按照如下公式计算计算数据加载后Hash再分布的同步次数steps

maxSplitSize/conf.getLong("hama.graph.multi.step.partitioning.interval",20000000) +1

由此公式可知,用户可配置hama.graph.multi.step.partitioning.interval的大小,但在hama-default.xml未找到此项。

hama.graph.multi.step.partitioning.interval含义:表示Hash再分布时进行同步的最大块单元,默认是20M。

steps = 64M / 20M + 1 = 4        (进行4次同步)

然后MasterPeer把该steps值发送给所有的BSPPeer,并在每个BSPPeer中赋值给GraphJobRunner. partitioningSteps变量(值为4)。在每个BSPPeer计算各自的Hash再分布时的块同步单元:interval = splitSize / partitioningSteps。计算结果如下表 2所示:

表 2  每个BSPPeer进行Hash再分布的块信息

Peer序号

BSPPeer1

BSPPeer2

BSPPeer3

数据量

62M

64M

54M

partitioningSteps值

4

4

4

Interval值

15M

16M

13M

每次同步块大小(M)

15、15、15、17

16、16、16、16

13、13、13、15

每个BSPPeer依次从HDFS上读取数据,并根据Hash进行发送(每读入一个顶点就发送一次),当发送量达到自己的块同步单元后(BSPPeer1:15M,BSPPeer2:16M,BSPPeer3:13M),进行一次同步。各BSPPeer把接受到的数据加载的内存中,即存储于GraphJobRunner.Vertices变量中。按此进行3(partitioningSteps-1)次。

最后一次中,BSPPeer1发送17M数据,BSPPeer2发送16M数据,BSPPeer3发送15M数据,再进行同步,而后加载到GraphJobRunner.Vertices中。

数据Hash重分布之后,每个BSPPeer上的顶点vertices大小分布可能如下表3所示,其中假设每个顶点的大小40byte(实际每个顶点大小会不同,如PageRank。此处只是为了举例说明算法)。再补充GraphJobRunner中vertices的定义:

List<Vertex<V, E, M>> vertices =new ArrayList<Vertex<V, E, M>>()

表 3  BSPPeer进行Hash重分布后Vertices.size信息

Peer序号

BSPPeer1

BSPPeer2

BSPPeer3

数据量

80.2 M

40.16 M

59.64 M

Vertices.size

2005 K

1004 K

1491 K

 

下面阐述Hama的数据修复(Repair)机制,源码位于GraphJobRunner. repair()方法中,此方法在loadVertices()方法的最后调用。

先用单个BSPPeer上的例子介绍数据修复的概念。如对于PageRank,目前实际有四个顶点,如下图1所示。而用户输入的数据如下:

1        2       3

2        3

3        1       4


图 1  PageRank图

但用户没有写4顶点的信息,应该写为: 4      邻接顶点,当其邻接边为空的时候,也应该写为:4      空(实际不写“空”,为了文档描述方便)。数据修复的目的就是增加:“4       边空”这条信息。其实是把4顶点作为悬挂顶点来处理。

在超步(S-1)中3顶点会把其PR值的1/2发送给顶点4应该所在的BSPPeer(实际没有4顶点的信息)。在超步S中,若数据加载时没有进行过数据修复,则BSPPeer没有4顶点的信息,不如直接把其临边作为空处理就行,这和数据修复效果一样。这样做不是更加简单吗?为什么要花那么大的代价进行数据修复呢?

解释:上述在计算过程中直接把其邻接边作为空的方案是不正确的。因为在计算顶点总数(等于每个BSPPeer上的Vertices.size之和)时就会出错,导致给每个顶点的初始值就会出错,然后再导致aggregator出错。

 

每个BSPPeer获取其上Vertices的大小,都发送给MasterPeer。在MasterPeer上找到最小的minVerticesSize,再计算数据修复时的同步次数multiSteps,公式如下:

multiSteps = min { minVerticesSize , ( partitioningSteps * 2 ) }

分析:一般minVerticesSize都大于( partitioningSteps* 2 )。如对上例minVerticesSize的大小为1000k,而( partitioningSteps *2 ) = 4*2 = 8,故multiSteps的值为8。然后MasterPeer把此值发送给所有的BSPPeer,每个BSPPeer存储于自己的变量multiSteps中。在每个BSPPeer计算各自数据修复时的块同步单元:vertices.size/ multiSteps。注意:此时进行同步的单元不是数据量大小,而是顶点的数目。计算结果如下所示。

表 4  每个BSPPeer进行数据修复时的同步信息

Peer序号

BSPPeer1

BSPPeer2

BSPPeer3

数据量

80.2 M

40.16 M

59.64 M

Vertices.size

2005 K

1004 K

1491 K

顶点同步单元

250 K

125 K

186 K

multiSteps次同步后剩余

5 k

4 k

3 k

每个BSPPeer依次从内存(vertices变量)上读取每个顶点,获取其邻接顶点后,再根据其Hash值把邻接顶点的id发送到相应的BSPPeer上。当发送顶点的数目达到各自的同步单元后(BSPPeer1:250 K,BSPPeer2:125 K,BSPPeer3:186 K),进行一次同步。各BSPPeer把接收到的数据存储于临时变量tmp(其定义为:new HashMap<V, Vertex<V, E, M>>(),V用来存储邻接顶点的id,Vertex是以邻接顶点id为VertexID且Edges为空的顶点)中。按此进行multiSteps次(8)。注意:与数据加载后Hash再分布时的(partitioningSteps-1)次不同。

进行multiSteps后,三个BSPPeer节点依然剩余5 K、4 K 、3 K,再进行最后一次同步,各BSPPeer依然后收到的数据加载到tmp变量中。

然后每个BSPPeer扫描自己的vertices,把VertexID属于tmp的从tmp中删除。最后把tmp中剩余的顶点对应的Vertex(以邻接顶点id为VertexID且Edges为空)加入到GraphJobRunner.Vertices中,至此数据修复完成。