首页 > 代码库 > Spark RDD编程(二)

Spark RDD编程(二)

转载请注明出处:http://blog.csdn.net/gamer_gyt @高阳团
博主微博:http://weibo.com/234654758
Github:https://github.com/thinkgamer

============================================================

SparkRDD编程(一)

Spark 的键值对(pair RDD)操作,Scala实现


RDD的分区函数


目前Spark中实现的分区函数包括两种

  • HashPartitioner(哈希分区)
    原理图:
    技术分享

  • RangePartitioner(区域分区)

partitioner这个属性只存在于< K,V>类型的RDD中,对于非< K,V >类型的partitioner的值就是None,partitioner函数即决定了RDD本身的分区数量,也可作为RDD shuffle输出中每个区分进行数据切割的依据。

scala> val rdd = sc.makeRDD(1 to 10,2).map(x=>(x,x))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:27

scala> rdd.partitioner
res0: Option[org.apache.spark.Partitioner] = None

scala> val group_rdd = rdd.groupByKey(new org.apache.spark.HashPartitioner(3))
group_rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupByKey at <console>:29

scala> group_rdd.partitioner
res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)

scala> group_rdd.collect()
res4: Array[(Int, Iterable[Int])] = Array((6,CompactBuffer(6)), (3,CompactBuffer(3)), (9,CompactBuffer(9)), (4,CompactBuffer(4)), (1,CompactBuffer(1)), (7,CompactBuffer(7)), (10,CompactBuffer(10)), (8,CompactBuffer(8)), (5,CompactBuffer(5)), (2,CompactBuffer(2)))

RDD的基本转换操作


1. repartition 和 coalesce

两者都是对rdd分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,这里我们讨论一下coalesce合并函数该如何设置shuffle参数,这里分三种情况(假设RDD有N个分区,需要重新划分为M个分区)

  • 如果N < M
    一般情况下N个分区有数据分布不均的情况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle的参数设置为true
  • 如果N > M(两者相差不大)
    两者相差不大的情况下,就可以将N中的若干个分区合并未一个分区,最终合并未M个分区,这时可以将shuffle参数设置为false(在shuffle为false的情况下,设置M>N,coalesce是不起作用的),不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。
  • 如果N>>M(N远大于M的情况)
    N,M相差悬殊的时候如果把shuffle参数设置为false,由于父子
    RDD是窄依赖,他们同处在一个Stage中,就有可能造成Spark程序运行的并行度不高,从而影响性能。比如在M为1时,由于只有一个分区,所以只会有一个任务在运行,为了使coalesce之前的操作有更好的并行度,可以将shuffle参数设置为true。
scala> val rdd = sc.makeRDD(1 to 10,100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:21

scala> rdd.partitions.size
res14: Int = 100

scala> val repartitionRDD = rdd.repartition(4)
repartitionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at repartition at <console>:23

scala> repartitionRDD.partitions.size
res15: Int = 4          
------------------------------------------------------------------------

scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[31] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res16: Int = 3

scala> val coalesceRDD = rdd.coalesce(1)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[34] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res17: Int = 1

scala> val coalesceRDD = rdd.coalesce(1,shuffle=true)  #增加并行度
coalesceRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[38] at coalesce at <console>:23       
res18: Int = 1            

如果第二次分区的数目大于现有的分区数,不指定参数时,分区数不改变,也就是说在不进行洗牌的情况下,是无法将RDD的分区数目进行改变的

scala> val rdd = sc.makeRDD(1 to 1000,1000)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[39] at makeRDD at <console>:21

scala> val coalesceRDD = rdd.coalesce(1)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[40] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res21: Int = 1

scala> val coalesceRDD = rdd.coalesce(100000)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[41] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res22: Int = 1000

scala> val coalesceRDD = rdd.coalesce(100000,shuffle=true)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[45] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res23: Int = 100000

2. randomSplit()和glom()

randomSplit是根绝weights权重将一个RDD切分成多个RDD,而glom函数是将RDD中每一个分区中类型为T的元素转换为数组[T],这样每一个分区就只有一个数组元素。

scala> val rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[50] at makeRDD at <console>:21

scala> rdd.collect()
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val glomRDD = rdd.glom()
glomRDD: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[51] at glom at <console>:23

scala> glomRDD.collect()
res27: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

-------------------------------------------------------------------
scala> val rdd = sc.makeRDD(1 to 10,10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at makeRDD at <console>:21

scala> rdd.collect()
res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val splitRDD = rdd.randomSplit(Array(1.0,3.0,6.0))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[57] at randomSplit at <console>:23, MapPartitionsRDD[58] at randomSplit at <console>:23, MapPartitionsRDD[59] at randomSplit at <console>:23)

scala> splitRDD(0).collect()
res33: Array[Int] = Array()

scala> splitRDD(1).collect()
res34: Array[Int] = Array(6)


scala> splitRDD(2).collect()
res36: Array[Int] = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)

3. mapPartitions和mapPartitionsWithIndex

mapPartitions与map转换操作类似,只不过映射函数的输入参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,该操作有一个参数perservesPartitioning指明mapPartitions是否保留父RDD的partitions的分区信息。mapPartitionWithIndex和mapPartitions功能类似,只是输入参数时多了一个分区的ID

scala> val rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:27

scala> val mapRDD = rdd.map(x=>(x,x))
mapRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at <console>:29

scala> val groupRDD = mapRDD.groupByKey(3)
groupRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:31

scala> val mapPartitionsRDD = groupRDD.mapPartitions(iter=>iter.filter(_._1>3))
mapPartitionsRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = MapPartitionsRDD[7] at mapPartitions at <console>:33

scala> mapPartitionsRDD.collect()
res3: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (5,CompactBuffer(5)))

scala> mapPartitionsRDD.partitioner
res4: Option[org.apache.spark.Partitioner] = None

scala> val mapPartitionsRDD = groupRDD.mapPartitions(iterator => iterator.filter(_._1>3),true)
mapPartitionsRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = MapPartitionsRDD[8] at mapPartitions at <console>:33

scala> mapPartitionsRDD.partitioner
res5: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)

3. zip和zipPartitions

zip是将两个RDD组成key/value(键/值)形式的RDD,这里认为两个rdd的partitioner数量以及元素数量都相等。
zipPartitions是将多个RDD,按照partition组合成新的RDD,zipPartitions需要相互组合的RDD具有相同的分区数,但是对于每个分区中的元素数量是没有限制的

scala> val rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27

scala> val mapRDD=rdd.map(x=>(x+1.0))
mapRDD: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[1] at map at <console>:29

scala> val zipRDD = rdd.zip(mapRDD)
zipRDD: org.apache.spark.rdd.RDD[(Int, Double)] = ZippedPartitionsRDD2[2] at zip at <console>:31

scala> zipRDD.collect
res0: Array[(Int, Double)] = Array((1,2.0), (2,3.0), (3,4.0), (4,5.0), (5,6.0)) 
scala> val rdd1=sc.makeRDD(Array("1","2","3","4","5","6"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at makeRDD at <console>:21

scala> val zipPartitionsRDD = rdd.zipPartitions(rdd1)((i:Iterator[Int],s:Iterator[String])=>{Iterator(i.toArray.size,s.toArray.size)})
zipPartitionsRDD: org.apache.spark.rdd.RDD[Int] = ZippedPartitionsRDD2[7] at zipPartitions at <console>:25

scala> zipPartitionsRDD.collect()
res3: Array[Int] = Array(2, 3, 3, 3)


4. zipWithIndex和zinWithUniqueId

zipWithIndex是将RDD中的元素和这个元素的ID组合成键/值对,比如说第一个分区的第一个元素是0,第一个分区的第二个元素是1,依次类推
zipWithUniqueID是将RDD中的元素和一个唯一ID组合成键/值对,假设RDD共有N个分区,那么第一个分区的第一个元素唯一ID是1,第一个分区的第二个元素就是1+N,第一个分区的第三个元素就是1+2N,依次类推

scala> val rdd = sc.makeRDD(1 to 6,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:21

scala> val zipWithIndex = rdd.zipWithIndex()
zipWithIndex: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[1] at zipWithIndex at <console>:23

scala> zipWithIndex.collect()
res0: Array[(Int, Long)] = Array((1,0), (2,1), (3,2), (4,3), (5,4), (6,5))

scala> val zipWithUniqueID = rdd.zipWithUniqueId()
zipWithUniqueID: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[2] at zipWithUniqueId at <console>:23

scala> zipWithUniqueID.collect()
res1: Array[(Int, Long)] = Array((1,0), (2,2), (3,4), (4,1), (5,3), (6,5))

控制操作


在Spark中对RDD持久化操作时一项非常重要的功能,可以将RDD持久化在不同层次的存储介质中,以便后续的操作能够重复使用

  • checkpoint
    将RDD持久化在HDFS上,与persist的一个区别是会切断此RDD之前的依赖关系,而persist依然保留着RDD的依赖关系。
    checkpoint的主要作用
    1、如果一个spark程序会很长时间驻留运行(如spark streaming 一般会7*2小时运行),过长的依赖将会占用很多系统资源,那么定期的将RDD进行checkpoint操作,能够有效节省系统资源
    2、维护过长的依赖关系还会出现一些小问题,如果Spark在运行过程中出现节点失败的情况,那么RDD进行容错重算的成本会非常高

scala> val rdd = sc.makeRDD(1 to 4,1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[60] at makeRDD at <console>:21

scala> val flatMapRDD = rdd.flatMap(x=>Seq(x,x))
flatMapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[61] at flatMap at <console>:23

scala> sc.setCheckpointDir("temp")
16/09/14 10:56:08 WARN spark.SparkContext: Checkpoint directory must be non-local if Spark is running on a cluster: temp

scala> flatMapRDD.checkpoint()

scala> flatMapRDD.dependencies.head.rdd
res40: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[60] at makeRDD at <console>:21

scala> flatMapRDD.collect()
res41: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4)

scala> flatMapRDD.dependencies.head.rdd
res42: org.apache.spark.rdd.RDD[_] = ReliableCheckpointRDD[62] at collect at <console>:26
在hdfs上查看具体信息 ![checkpoint保存](http://img.blog.csdn.net/20160914110209481)                             ![这里写图片描述](http://img.blog.csdn.net/20160914110324415)
  • persist

    使用和checkpoint类似,这里不做演示,可参考 Spark RDD编程(一)


行动操作


这里我们不具体列出使用案例,在Spark RDD编程(一)中已经详细说明,这里只做一个回顾与总结

  • 集合标量行动操作
函数名称 功能
first 返回rdd中的第一个元素
count 返回RDD中元素的个数
reduce 对rdd中的元素进行二元运算,返回计算结果
collect()/toArray() 以集合形式返回RDD的元素
take(num:Int) 将RDD作为集合,返回集合中[0,num-1]下标的元素
top(num:Int) 按照默认的或者是指定的排序规则,返回前num个元素
takeOrdered(num:Int) 以与top相反的排序规则,返回前num个元素
aggregate 比较麻烦参考Spark RDD编程(一)
fold 是aggregate的便利接口
lookup(Key:K):Seq[v] 针对(K,V)类型的RDD行动操作,对于给定的键值,返回与此键值相对应的所有值


  • 存储行动操作
函数名称 功能
saveAsTextFile() 保存到hdfs
saveAsObjectFile() 用于将RDD中的元素序列化成对象,存储到文件中。对于HDFS,默认采用SequenceFile保存。
saveAsHadoopFile() 保存为hadoop的一种格式,比如说TextFileOutputFormat,SequenceFileOutputFormat,OutputFormat…
saveAsHadoopDataset() 保存到数据库如hbase,mongodb,Cassandra



END!

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    Spark RDD编程(二)