首页 > 代码库 > Spark join 源码跟读记录

Spark join 源码跟读记录

rdd.join的实现:rdd1.join(rdd2) => rdd1.cogroup(rdd2,partitioner) 

/**   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.   */  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
//rdd.join的实现:rdd1.join(rdd2) => rdd1.cogroup(rdd2,partitioner) => flatMapValues(遍历两个value的迭代器)
   //最后返回的是(key,(v1,v2))这种形式的元组
this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }
跟到cogroup方法
  /**   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the   * list of values for that key in `this` as well as `other`.   */  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)      : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {      throw new SparkException("Default partitioner cannot partition array keys.")    }    /**     * 这里构造一个CoGroupedRDD,也就是 cg = new CoGroupedRDD(Seq(rdd1,rdd2),partitioner)     * 其键值对中的value要求是Iterable[V]和Iterable[W]类型     * 下面了解CoGroupedRDD这个类,看是怎么构造的     */    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)    cg.mapValues { case Array(vs, w1s) =>      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])    }  }

这是CoGroupedRDD的类声明,其中有两个与java 语法的不同:

1.类型声明中的小于号“<”,这个在scala 中叫做变量类型的上界,也就是原类型应该是右边类型的子类型,具体参见《快学scala》的17.3节

2.@transient:这个是瞬时变量注解,不用进行序列化 ,也可以参见《快学Scala》的15.3节

/** 这里返回的rdd的类型是(K,Array[Iterable[_]]),即key不变,value为所有对应这个key的value的迭代器的数组*/class CoGroupedRDD[K: ClassTag](    @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],    part: Partitioner)  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) 

看看这个RDD的依赖以及如何分区的

再看这两个函数之前,最好先了解下这两个类是干什么的:

1.CoGroupPartition是Partition的一个子类,其narrowDeps是NarrowCoGroupSplitDep类型的一个数组

/** *  这里说到CoGroupPartition 包含着父RDD依赖的映射关系, * @param index:可以看到CoGroupPartition 将index作为哈希code进行分区 * @param narrowDeps:narrowDeps是窄依赖对应的分区数组 */private[spark] class CoGroupPartition(    override val index: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])  extends Partition with Serializable {  override def hashCode(): Int = index  override def equals(other: Any): Boolean = super.equals(other)}

2.这个NarrowCoGroupSplitDep的主要功能就是序列化,为了避免重复,对rdd做了瞬态注解

/** 这个NarrowCoGroupSplitDep的主要功能就是序列化,为了避免重复,对rdd做了瞬态注解*/private[spark] case class NarrowCoGroupSplitDep(    @transient rdd: RDD[_], //瞬态的字段不会被序列化,适用于临时变量    @transient splitIndex: Int,    var split: Partition  ) extends Serializable {  @throws(classOf[IOException])  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {    // Update the reference to parent split at the time of task serialization    split = rdd.partitions(splitIndex)    oos.defaultWriteObject()  }}

回到CoGroupedRDD上来,先看这个RDD的依赖是如何划分的:

  /*  * 简单看下CoGroupedRDD重写的RDD的getDependencies:   * 如果两个rdd的分区函数相同就是窄依赖   * 否则就是宽依赖  */  override def getDependencies: Seq[Dependency[_]] = {    rdds.map { rdd: RDD[_] =>      if (rdd.partitioner == Some(part)) {        /*如果分区函数不为None 对应窄依赖*/        logDebug("Adding one-to-one dependency with " + rdd)        new OneToOneDependency(rdd)      } else {        logDebug("Adding shuffle dependency with " + rdd)        new ShuffleDependency[K, Any, CoGroupCombiner](          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)      }    }  }

CoGroupedRDD.getPartitions 返回一个带有Partitioner.numPartitions个分区类型为CoGroupPartition的数组

  /*  * 这里返回一个带有Partitioner.numPartitions个分区类型为CoGroupPartition的数组  */  override def getPartitions: Array[Partition] = {    val array = new Array[Partition](part.numPartitions)    for (i <- 0 until array.length) {      // Each CoGroupPartition will have a dependency per contributing RDD      //rdds.zipWithIndex 这个是生成一个(rdd,rddIndex)的键值对,可以查看Seq或者Array的API      //继续跟到CoGroupPartition这个Partition,其是和Partition其实区别不到,只是多了一个变量narrowDeps      //回来看NarrowCoGroupSplitDep的构造,就是传入了每一个rdd和分区索引,以及分区,其可以将分区序列化      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>        // Assume each RDD contributed a single dependency, and get it        dependencies(j) match {          case s: ShuffleDependency[_, _, _] => None          case _ => Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))        }      }.toArray)    }    array  }

好,现在弱弱的总结下CoGroupedRDD,其类型大概是(k,(Array(CompactBuffer[v1]),Array(CompactBuffer[v2]))),这其中用到了内部的封装,以及compute函数的实现

有兴趣的可以继续阅读下源码,这一部分就不介绍了。

下面还是干点正事,把join算子的整体简单理一遍:

1.join 算子内部使用了cogroup算子,这个算子返回的是(key,(v1,v2))这种形式的元组

2.深入cogroup算子,发现其根据rdd1,rdd2创建了一个CoGroupedRDD

3.简要的分析了CoGroupedRDD的依赖关系,看到如果两个rdd的分区函数相同,那么生成的rdd分区数不变,它们之间是一对一依赖,也就是窄依赖,从而可以减少依次shuffle

4. CoGroupedRDD的分区函数就是将两个rdd的相同分区索引的分区合成一个新的分区,并且通过NarrowCoGroupSplitDep这个类实现了序列化

5.具体的合并过程还未记录,之后希望可以补上这部分的内容

 

  

Spark join 源码跟读记录