首页 > 代码库 > Spark on YARN--WordCount、TopK

Spark on YARN--WordCount、TopK

1、首先利用http://dongxicheng.org/framework-on-yarn/spark-eclipse-ide/搭建好的Eclipse(Scala)开发平台编写scala文件,内容如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object HdfsWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
  //  val file = sc.textFile("D:\\test.txt")
    val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
 //   println(counts)
    counts.saveAsTextFile(args(2)/*"hdfs://master:9101/user/root/out"*/)
  }
}

2、利用Eclipse的Export Jar File功能将Scala源文件编译成class文件并打包成sc.jar

3、执行run_wc.java脚本:

#! /bin/bash
SPARK_HADOOP_VERSION=2.2.0
SPARK_YARN=true
export SPARK_JAR=$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
export EXEC_JAR=$SPARK_HOME/sc.jar
#examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar 


./bin/spark-class org.apache.spark.deploy.yarn.Client  --jar $EXEC_JAR  --class HdfsWordCount  --args  yarn-standalone  --args hdfs://master:9101/user/root/spam.data  --args hdfs://master:9101/user/root/out2  --num-workers 1  --master-memory 512m  --worker-memory 512m  --worker-cores 1


附:

TopK(选出出现频率最高的前k个)代码:

package sc
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object TopK {
  def main(args: Array[String]) {
    //yarn-standalone hdfs://master:9101/user/root/spam.data 5
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
    val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    val sorted=counts.map{
      case(key,val0) => (val0,key)
    }.sortByKey(true,1)
    val topK=sorted.top(args(2).toInt)
    topK.foreach(println)
  }
}


附录2 join操作(题意详见:http://dongxicheng.org/framework-on-yarn/spark-scala-writing-application/):

package sc
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkJoinTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"SparkJoinTest",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val txtFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
    val rating=txtFile.map(line =>{
    	val fileds=line.split("::")
    	(fileds(1).toInt,fileds(2).toDouble)
    	}
    )//大括号内以最后一个表达式为值
    val movieScores=rating.groupByKey().map(
        data=http://www.mamicode.com/>{>