首页 > 代码库 > Spark on YARN--WordCount、TopK
Spark on YARN--WordCount、TopK
1、首先利用http://dongxicheng.org/framework-on-yarn/spark-eclipse-ide/搭建好的Eclipse(Scala)开发平台编写scala文件,内容如下:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object HdfsWordCount { def main(args: Array[String]) { val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar") val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system // val file = sc.textFile("D:\\test.txt") val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) // println(counts) counts.saveAsTextFile(args(2)/*"hdfs://master:9101/user/root/out"*/) } }
2、利用Eclipse的Export Jar File功能将Scala源文件编译成class文件并打包成sc.jar
3、执行run_wc.java脚本:
#! /bin/bash SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true export SPARK_JAR=$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar export EXEC_JAR=$SPARK_HOME/sc.jar #examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar ./bin/spark-class org.apache.spark.deploy.yarn.Client --jar $EXEC_JAR --class HdfsWordCount --args yarn-standalone --args hdfs://master:9101/user/root/spam.data --args hdfs://master:9101/user/root/out2 --num-workers 1 --master-memory 512m --worker-memory 512m --worker-cores 1
附:
TopK(选出出现频率最高的前k个)代码:
package sc import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object TopK { def main(args: Array[String]) { //yarn-standalone hdfs://master:9101/user/root/spam.data 5 val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar") val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) val sorted=counts.map{ case(key,val0) => (val0,key) }.sortByKey(true,1) val topK=sorted.top(args(2).toInt) topK.foreach(println) } }
附录2 join操作(题意详见:http://dongxicheng.org/framework-on-yarn/spark-scala-writing-application/):
package sc import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SparkJoinTest { def main(args: Array[String]) { val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"SparkJoinTest",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar") val txtFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system val rating=txtFile.map(line =>{ val fileds=line.split("::") (fileds(1).toInt,fileds(2).toDouble) } )//大括号内以最后一个表达式为值 val movieScores=rating.groupByKey().map( data=http://www.mamicode.com/>{>
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。