首页 > 代码库 > spark
spark
Transformation 和Action本质区别:
Transformations是RDD到RDD;
Actions是RDD到result。
Actions算子触发Spark job。
Spark groupbykey和cogroup使用示例
groupByKey
groupByKey([numTasks])是数据分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。
val rdd0 = sc.parallelize(Array((1,1), (1,2) , (1,3) , (2,1) , (2,2) , (2,3)), 3)
val rdd1 = rdd0.groupByKey()
rdd1.collect
res0: Array[(Int, Iterable[Int])] = Array((1,ArrayBuffer(1, 2, 3)), (2,ArrayBuffer(1, 2, 3)))
cogroup
cogroup(otherDataset, [numTasks])是将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, Seq[V], Seq[W])的数据集。
val rdd2 = rdd0.cogroup(rdd0)
rdd2.collect
res1: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(ArrayBuffer(1, 2, 3),ArrayBuffer(1, 2, 3))), (2,(ArrayBuffer(1, 2, 3),ArrayBuffer(1, 2, 3))))
Saprk aggregateByKey操作示例
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U) 和reduceByKey的不同在于,
reduceByKey输入输出都是(K, V),
aggreateByKey输出是(K,U),可以不同于输入(K, V) ,
aggreateByKey的三个参数:
zeroValue: U,初始值,比如空列表{} ;
seqOp: (U,T)=> U,seq操作符,描述如何将T合并入U,比如如何将item合并到列表 ;
combOp: (U,U) =>U,comb操作符,描述如果合并两个U,比如合并两个列表 ;
所以aggreateByKey可以看成更高抽象的,更灵活的reduce或group 。
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.aggreate(0)(math.max(_, _), _ + _)
res0: Int = 9
val z = sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3)))
z.aggregateByKey(0)(math.max(_, _), _ + _)
res1: Array[(Int, Int)] = Array((2,3), (1,9))
Spark combinebykey使用示例
combineByKey是对RDD中的数据集按照Key进行聚合操作。聚合操作的逻辑是通过自定义函数提供给combineByKey。
combineByKey[C](createCombiner: (V) ? C, mergeValue: (C, V) ? C, mergeCombiners: (C, C) ? C, numPartitions: Int):RDD[(K, C)]
把(K,V) 类型的RDD转换为(K,C)类型的RDD,C和V可以不一样。
combineByKey三个参数:
val data = http://www.mamicode.com/Array((1, 1.0), (1, 2.0), (1, 3.0), (2, 4.0), (2, 5.0), (2, 6.0))
val rdd = sc.parallelize(data, 2)
val combine1 = rdd.combineByKey(
createCombiner = (v:Double) => (v:Double, 1),
mergeValue = http://www.mamicode.com/(c:(Double, Int), v:Double) => (c._1 + v, c._2 + 1),
mergeCombiners = (c1:(Double, Int), c2:(Double, Int)) => (c1._1 + c2._1, c1._2 + c2._2),
numPartitions = 2 )
combine1.collect
res0: Array[(Int, (Double, Int))] = Array((2,(15.0,3)), (1,(6.0,3)))
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
Spark RDD Actions操作之reduce()
The arguments to reduce() are Scala function literals (closures)。
reduce将RDD中元素两两传递给输入函数? 同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
scala的anonymous(匿名函数):
def num(x: Int, y: Int) => if(x > y) x else y
Scala的currying(柯里化):
def num(x: Int)(y: Int) => if(x > y) x else y
def num(x: Int) = (y: Int) => if(x > y) x else y
Spark之中map与flatMap的区别
map()是将函数用于RDD中的每个元素,将返回值构成新的RDD。
flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,这样就得到了一个由各列表中的元素组成的RDD,而不是一个列表组成的RDD。
有些拗口,看看例子就明白了。
val rdd = sc.parallelize(List("coffee panda","happy panda","happiest panda party"))
输入
rdd.map(x=>x).collect
结果
res9: Array[String] = Array(coffee panda, happy panda, happiest panda party)
输入
rdd.flatMap(x=>x.split(" ")).collect
结果
res8: Array[String] = Array(coffee, panda, happy, panda, happiest, panda, party)
flatMap说明白就是先map然后再flat,再来看个例子
val rdd1 = sc.parallelize(List(1,2,3,3))
scala> rdd1.map(x=>x+1).collect
res10: Array[Int] = Array(2, 3, 4, 4)
scala> rdd1.flatMap(x=>x.to(3)).collect
res11: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
---------------------------------------------------------------------------------------------------------------------------
点到为止版: flatMap = flatten + map;
深坑版: 就是自函子范畴上的一个协变函子的态射函数与自然变换的组合!
aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值zeroValue进行combine操作。这个函数最终返回的类型不需要和RDD中的元素类型一致。
示例:
解释:
Spark RDD Actions操作之reduce()
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
The arguments to reduce() are Scala function literals (closures)。
reduce将RDD中元素两两传递给输入函数? 同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
scala的anonymous(匿名函数):
def num(x: Int, y: Int) => if(x > y) x else y
Scala的currying(柯里化):
def num(x: Int)(y: Int) => if(x > y) x else y
def num(x: Int) = (y: Int) => if(x > y) x else y
提高RDD的使用效率。
Spark缓存策略示例:
0.NONE(不需要缓存)
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
1.DISK_ONLY
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
2.DISK_ONLY_2
副本2份
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
3.MEMORY_ONLY(默认的)
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
4.MEMORY_ONLY_2
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
5.MEMORY_ONLY_SER
SER做序列化。会消耗CPU。
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
6.MEMORY_ONLY_SER_2
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
7.MEMORY_AND_DISK
内存中若放不下,则多出的部分放在机器的本地磁盘上,区别于MEMORY_ONLY(内存中若放不下,则多出的部分原来在哪就还在哪)
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
8.MEMORY_AND_DISK_2
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
9.MEMORY_AND_DISK_SER
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
10.MEMORY_AND_DISK_SER_2
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
11.OFF_HEAP(不使用堆,比如可以使用Tachyon)
参数:_useDisk, _useMemory, _useOffHeap, _deserialized, _replication(默认值为1)
如何选择RDD的持久化策略?
1.Cache() MEMEORY_ONLY
2.MEMORY_ONLY_SER
3._2
4.能使用内存就不使用磁盘
spark