首页 > 代码库 > Spark操作:Aggregate和AggregateByKey
Spark操作:Aggregate和AggregateByKey
1. Aggregate
Aggregate即聚合操作。直接上代码:
import org.apache.spark.{SparkConf, SparkContext}object AggregateTest { def main(args:Array[String]) = { // 设置运行环境 val conf = new SparkConf().setAppName("Aggregate Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar")) val sc = new SparkContext(conf) var data = List(2,5,8,1,2,6,9,4,3,5) var res = data.par.aggregate((0,0))( // seqOp (acc, number) => (acc._1+number, acc._2+1), // combOp (par1, par2) => (par1._1+par2._1, par1._2+par2._2) ) println(res) sc.stop }}
acc即(0,0),number即data,seqOp将data的值累加到Tuple的第一个元素,将data的个数累加到Tuple的第二个元素。由于没有分区,所以combOp是不起作用的,这个例子里面即使分区了,combOp起作用了,结果也是一样的。
运行结果:
(45,10)
2. AggregateByKey
AggregateByKey和Aggregate差不多,也是聚合,不过它是根据Key的值来聚合。
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * Created by Administrator on 2017/6/13. */object AggregateByKeyTest { def main(args:Array[String]) = { // 设置运行环境 val conf = new SparkConf().setAppName("AggregateByKey Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar")) val sc = new SparkContext(conf) val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)) val rdd = sc.parallelize(data) val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)( // seqOp math.max(_,_), // combOp _+_ ) res.collect.foreach(println) sc.stop }}
根据Key值的不同,可以分为3个组:
(1) (1,3),(1,2),(1,4);
(2) (2,3);
(3) (3,6),(3,8)。
这3个组分别进行seqOp,也就是(K,V)里面的V和0进行math.max()运算,运算结果和下一个V继续运算,以第一个组为例,运算过程是这样的:
0, 3 => 3
3, 2 => 3
3, 4 => 4
所以最终结果是(1,4)。combOp是对把各分区的V加起来,由于这里并没有分区,所以实际上是不起作用的。
运行结果:
(2,3)(1,4)(3,8)
如果生成RDD时分成3个区:
val rdd = sc.parallelize(data,3)
运行结果就变成了:
(3,8)(1,7)(2,3)
这是因为一个分区返回(1,3),另一个分区返回(1,4),combOp将这两个V加起来,就得到了(1,7)。
Spark操作:Aggregate和AggregateByKey
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。