首页 > 代码库 > spark sortByKey subtractByKey take takeOrdered等函数使用例子
spark sortByKey subtractByKey take takeOrdered等函数使用例子
package com.latrobe.spark import org.apache.spark.{SparkContext, SparkConf} /** * Created by spark on 15-1-19. * 根据key对K-V类型的RDD进行排序获得新的RDD */ object SortByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","cat","owl","gnu","ant")) val b = sc.parallelize(1 to a.count().toInt) val c = a.zip(b) //asc c.sortByKey(true).collect().foreach(print) //desc c.sortByKey(false).collect().foreach(print) } } /** * Created by spark on 15-1-19. * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的 */ object Subtract { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10) val b = sc.parallelize(1 to 3) //45678910 //a.subtract(b).collect().foreach(print) val c = sc.parallelize(1 to 10) val d = sc.parallelize(List(1,2,3,11)) //45678910 c.subtract(d).collect().foreach(print) } } /** * Created by spark on 15-1-19. * RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 key中存在的,RDD2 key中不存在的 */ object SubtractByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","he","word","hello")) val b = a.keyBy(_.length) val c = sc.parallelize(List("cat","first","everyone")) val d = c.keyBy(_.length) //(2,he)(4,word) b.subtractByKey(d).collect().foreach(print) } } /** * Created by spark on 15-1-19. * sumApprox没有出现我希望的结果 */ object SumAndSumApprox { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) val b = a.sum() val c = a.sumApprox(0L,0.9).getFinalValue() println(b + " *** " + c) } } /** * Created by spark on 15-1-19. * 取出RDD的前n个元素,以数组的形式返回 */ object Take { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) //12345678910 a.take(10).foreach(print) } } /** * Created by spark on 15-1-19. * 对RDD元素进行升序排序 * 取出前n个元素并以数组的形式放回 */ object TakeOrdered { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(List("ff","aa","dd","cc")) //aacc a.takeOrdered(2).foreach(print) } } /** * Created by spark on 15-1-19. * 数据取样 */ object TakeSample { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10000) /** * 9048 5358 5216 7301 6303 6179 6151 5304 8115 3869 */ a.takeSample(true , 10 , 1).foreach(println) } } /** * Created by spark on 15-1-19. * debug 详情信息显示 */ object ToDebugString { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 9) val b = sc.parallelize(1 to 3) val c = a.subtract(b) c.toDebugString } } /** * Created by spark on 15-1-19. * 获得前几个最大值 */ object Top { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000) val c = a.top(10) /** *1000 999 998 997 996 995 994 993 992 991 */ c.foreach(println) } } /** * Union == ++ 把两个RDD合并为一个新的RDD */ object Union { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 3) val b = sc.parallelize(3 to 5) val c = a.union(b) val d = a ++ b /** *123345 */ c.collect().foreach(print) /** *123345 */ d.collect().foreach(print) } }
spark sortByKey subtractByKey take takeOrdered等函数使用例子
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。