首页 > 代码库 > RDD的依赖关系

RDD的依赖关系

RDD的依赖关系

Rdd之间的依赖关系通过rdd中的getDependencies来进行表示,

在提交job后,会通过在DAGShuduler.submitStage-->getMissingParentStages

privatedefgetMissingParentStages(stage: Stage): List[Stage] = {

valmissing =newHashSet[Stage]

valvisited =newHashSet[RDD[_]]

defvisit(rdd: RDD[_]) {

if(!visited(rdd)){

visited+= rdd

if(getCacheLocs(rdd).contains(Nil)){

for(dep <-rdd.dependencies) {

depmatch{

caseshufDep:ShuffleDependency[_,_] =>

valmapStage =getShuffleMapStage(shufDep,stage.jobId)

if(!mapStage.isAvailable){

missing+=mapStage

}

casenarrowDep:NarrowDependency[_] =>

visit(narrowDep.rdd)

}

}

}

}

}

visit(stage.rdd)

missing.toList

}

在以上代码中得到rdd的相关dependencies,每个rdd生成时传入rdddependencies信息。

SparkContext.textFile,时生成的HadoopRDD时。此RDD的默觉得dependencysNil.

Nil是一个空的列表。

classHadoopRDD[K, V](

sc: SparkContext,

broadcastedConf:Broadcast[SerializableWritable[Configuration]],

initLocalJobConfFuncOpt:Option[JobConf => Unit],

inputFormatClass: Class[_ <:InputFormat[K, V]],

keyClass: Class[K],

valueClass: Class[V],

minSplits: Int)

extendsRDD[(K, V)](sc, Nil) withLogging {


Dependency分为ShuffleDependencyNarrowDependency

当中NarrowDependency又包括OneToOneDependency/RangeDependency

Dependency唯一的成员就是rdd,即所依赖的rdd,parentrdd

abstractclassDependency[T](valrdd:RDD[T])extendsSerializable


OneToOneDependency关系:

最简单的依赖关系,parentchild里面的partitions是一一相应的,典型的操作就是map,filter

事实上partitionId就是partitionRDD中的序号,所以假设是一一相应,

那么parentchild中的partition的序号应该是一样的,例如以下是OneToOneDependency的定义

/**

*Represents a one-to-one dependency between partitions of the parentand child RDDs.

*/

classOneToOneDependency[T](rdd: RDD[T])extendsNarrowDependency[T](rdd) {

此类的Dependencyparent中的partitionIdchildRDD中的partitionId是一对一的关系。

也就是partition本身范围不会改变,一个parition经过transform还是一个partition,

尽管内容发生了变化,所以能够在local完毕,此类场景通常像mapreduce中仅仅有map的场景,

第一个RDD运行完毕后的MAPparition直接运行第二个RDDMap,也就是local运行。

overridedefgetParents(partitionId: Int) = List(partitionId)

}


RangeDependency关系:

此类应用尽管仍然是一一相应,可是是parentRDD中的某个区间的partitions相应到childRDD中的某个区间的partitions
典型的操作是
union,多个parentRDD合并到一个childRDD,故每一个parentRDD都相应到childRDD中的一个区间
须要注意的是
,这里的union不会把多个partition合并成一个partition,而是的简单的把多个RDD中的partitions放到一个RDD里面,partition不会发生变化,


rdd參数,parentRDD

inStart參数,parentRDDpartitionId计算的起点位置。

outStart參数,childRDD中计算parentRDDpartitionId的起点位置,

length參数,parentRDDpartition的个数。

classRangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length:Int)

extendsNarrowDependency[T](rdd) {


overridedefgetParents(partitionId: Int) = {

检查partitionId的合理性,此partitionIdchildRDDpartitionId中的范围须要合理。

if(partitionId >= outStart && partitionId < outStart +length) {

计算出ParentRDDpartitionId的值。

List(partitionId - outStart +inStart)

}else{

Nil

}

}

}


典型的应用场景union的场景把两个RDD合并到一个新的RDD中。

defunion(other: RDD[T]): RDD[T] =newUnionRDD(sc,Array(this,other))

使用union,第二个參数是,两个RDDarray,返回值就是把这两个RDDunion后产生的新的RDD


ShuffleDependency关系:

此类依赖首先要求是Product2PairRDDFunctionsk,v的形式,这样才干做shuffle,和hadoop一样。

其次,因为须要shuffle,所以当然须要给出partitioner,默认是HashPartitioner怎样完毕shuffle

然后,shuffle不象map能够在local进行,往往须要网络传输或存储,所以须要serializerClass

默认是JavaSerializer,一个类名,用于序列化网络传输或者以序列化形式缓存起来的各种对象。

默认情况下Java的序列化机制能够序列化不论什么实现了Serializable接口的对象,

可是速度是非常慢的,

因此当你在意执行速度的时候我们建议你使用spark.KryoSerializer而且配置Kryoserialization

能够是不论什么spark.Serializer的子类。


最后,每一个shuffle须要分配一个全局的id,context.newShuffleId()的实现就是把全局id累加


classShuffleDependency[K, V](

@transientrdd: RDD[_ <: Product2[K, V]],

valpartitioner:Partitioner,

valserializerClass:String = null)

extendsDependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {


valshuffleId:Int = rdd.context.newShuffleId()

}



生成RDD过程分析

生成rdd我们还是按wordcount中的样例来说明;

valfile= sc.textFile("/hadoop-test.txt")

valcounts =file.flatMap(line=> line.split(" "))

.map(word => (word,1)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")


1.首先SparkContext.textFile通过调用hadoopFile生成HadoopRDD实例,

textFile-->hadoopFile-->HadoopRDD,此时RDDDependencyNil,一个空的列表。

此时的HadoopRDDRDD<K,V>,每运行next方法时返回一个Pair,也就是一个KV(通过compute函数)

2.textFile得到HadoopRDD后,调用map函数,

map中每运行一次得到一个KV(computegetNext,newNextIterator[(K, V)] )

取出value的值并toString,生成MappedRDD<String>。此RDD的上层RDD就是1中生成的RDD

同一时候此RDDDependencyOneToOneDependency

deftextFile(path: String, minSplits: Int = defaultMinSplits):RDD[String] = {

hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text],

minSplits).map(pair =>pair._2.toString)

}

defmap[U: ClassTag](f: T => U): RDD[U] = newMappedRDD(this,sc.clean(f))

以上代码中传入的this事实上就是1中生成的HadoopRDD.


3.flatMap函数,把2中每一行输出通过一定的条件改动成0到多个新的item.生成FlatMappedRDD实例,

同一时候依据implicit隐式转换生成PairRDDFunctions。以下两处代码中的红色部分。

在生成FlatMappedRDD是,此时的上一层RDD就是2中生成的RDD

同一时候此RDDDependencyOneToOneDependency

classFlatMappedRDD[U: ClassTag, T: ClassTag](

prev: RDD[T],

f:T => TraversableOnce[U])

extendsRDD[U](prev)


implicitdefrddToPairRDDFunctions[K: ClassTag, V:ClassTag](rdd: RDD[(K, V)]) =

newPairRDDFunctions(rdd)


4.map函数,因为3中生成的FlatMappedRDD生成出来的结果,通过implicit的隐式转换生成PairRDDFunctions

此时的map函数须要生成隐式转换传入的RDD<K,V>的一个RDD

因此map函数的运行须要生成一个MappedRDD<K,V>RDD,同一时候此RDDDependencyOneToOneDependency

下面代码的红色部分。---RDD[(K,V)]。。

valcounts=file.flatMap(line=> line.split(""))

.map(word=> (word, 1)).reduceByKey(_+ _)

5.reduceByKey函数,此函数通过implicit的隐式转换中的函数来进行,主要是传入一个计算两个value的函数。

reduceByKey这类的shuffleRDD时,终于生成一个ShuffleRDD,

RDD生成的DependencyShuffleDependency

详细说明在以下的reduceByKey代码中,

首先在每个map生成MapPartitionsRDD把各partitioner中的数据通过进行合并。合并通过Aggregator实例。

最后通过对合并后的MapPartitionsRDD,RDD相当于mapreduce中的combiner,生成ShuffleRDD.

defreduceByKey(func: (V, V) => V): RDD[(K, V)] = {

reduceByKey(defaultPartitioner(self),func)

}


defcombineByKey[C](createCombiner: V => C,//创建combiner,通过V的值创建C

mergeValue: (C, V) =>C,//combiner已经创建C已经有一个值,把第二个的V叠加到C中,

mergeCombiners: (C, C) =>C,//把两个C进行合并,事实上就是两个value的合并。

partitioner:Partitioner,//Shuffle时须要的Partitioner

mapSideCombine: Boolean =true,//为了减小传输量,非常多combine能够在map端先做,

比方叠加,能够先在一个partition中把全部同样的keyvalue叠加,shuffle

serializerClass: String =null):RDD[(K, C)] = {

if(getKeyClass().isArray) {

if(mapSideCombine) {

thrownewSparkException("Cannot use map-sidecombining with array keys.")

}

if(partitioner.isInstanceOf[HashPartitioner]) {

thrownewSparkException("Default partitionercannot partition array keys.")

}

}

生成一个Aggregator实例。

valaggregator=newAggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

假设RDD本身的partitioner与传入的partitioner同样,表示不须要进行shuffle

if(self.partitioner== Some(partitioner)) {

生成MapPartitionsRDD,直接在map端当前的partitioner下调用Aggregator.combineValuesByKey

把同样的keyvalue进行合并。

self.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context,aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning =true)

}elseif(mapSideCombine) {

生成MapPartitionsRDD,先在map端当前的partitioner下调用Aggregator.combineValuesByKey

把同样的keyvalue进行合并。

combineValuesByKey中检查假设key相应的C假设不存在,通过createCombiner创建C

否则key已经存在C时,通过mergeValue把新的V与上一次的C进行合并,

mergeValue事实上就是传入的reduceByKey(_+ _) 括号里的函数,与reduce端函数同样。

valcombined =self.mapPartitionsWithContext((context, iter) => {

aggregator.combineValuesByKey(iter,context)

}, preservesPartitioning =true)

生成ShuffledRDD,进行shuffle操作,由于此时会生成ShuffleDependency,又一次生成一个新的stage.

valpartitioned=newShuffledRDD[K, C, (K, C)](combined,partitioner)

.setSerializer(serializerClass)

在上一步完毕,也就是shuffle完毕,又一次在reduce端进行合并操作。通过Aggregator.combineCombinersByKey

spark这些地方的方法定义都是通过动态载入运行的函数的方式,所以能够做到map端运行完毕后reduce再去运行兴许的处理。

由于函数在map时仅仅是进行了定义,reduce端才对函数进行运行。

partitioned.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context,aggregator.combineCombinersByKey(iter,context))

}, preservesPartitioning =true)

}else{

不运行map端的合并操作,直接shuffle,并在reduce中运行合并。

//Don‘t apply map-side combiner.

valvalues =newShuffledRDD[K, V, (K, V)](self,partitioner).setSerializer(serializerClass)

values.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context,aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning =true)

}

}