首页 > 代码库 > spark-Day4
spark-Day4
package ling import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingFileWordCount { def main(args: Array[String]): Unit ={ val sparkConf =new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(30)) val lines = ssc.textFileStream("/home/hduser/Streamingtext") val words= lines.flatMap(_.split(" ")) val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_) wordcounts.print() ssc.start() ssc.awaitTermination() } }
今天是学习spark的第四天,学习如何使用Dstream
上一个代码是为了设置数据监控和文件的读取,只要在30秒内输入新的文件,监控器会读取输入的内容。
在此过程中如果import代码行加入如下代码,可以使监视器输出的内容只有读取的值没有日志(下面两行放在object里):
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
package ling import java.io.{PrintWriter} import java.net.ServerSocket import scala.io.Source object SocketSimulation { def index(length: Int)={ import java.util.Random val rdm = new Random rdm.nextInt(length) } def main(args:Array[String]): Unit ={ if(args.length!=3){ System.err.println("Usage:<filename> <port> <millisecond>") System.exit(1) } val filename = args(0) val lines = Source.fromFile(filename).getLines().toList val filerow=lines.length val listener =new ServerSocket(args(1).toInt) while(true){ val socket = listener.accept() new Thread(){ override def run={ println("Got client connected from:"+socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(),true) while(true){ Thread.sleep(args(2).toLong) val content= lines(index(filerow)) println(content) out.write(content+‘\n‘) out.flush() } socket.close() } }.start() } } }
/home/hduser/scala-2.10.4/lib/scala-swing.jar
/home/hduser/scala-2.10.4/lib/scala-library.jar
/home/hduser/scala-2.10.4/lib/scala-actors.jar
然后点ok,再Build一下,然后Run里的edit configuration中program argument 输入cloud01 9999这是输入运行的地址
这样一个模拟器打包完成
再在同一个工程里一个object文件输入如下代码:
package ling import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) def main (args:Array[String]) ={ val conf=new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val sc=new SparkContext(conf) val ssc=new StreamingContext(sc, Seconds(5)) val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER) val words= lines.flatMap(_.split(",")) val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_) wordcounts.print() ssc.start() ssc.awaitTermination() } }
本地主文件夹的streamingtext文件夹下有一个file1.txt
运行NetworkWordCount
然后打开终端:
java -cp StreamingNet.jar ling.SocketSimulation /home/hduser/Streamingtext/file1.txt 9999 1000
就会发现idea和终端同时运行。
NetworkWordCount中需要改处地址:
setMaster("local[2]")改成setMaster("spark://192.168.136.129:7077")自己的地址
改好后将此工程打包,按普通方式打包,不必添加上述的jar。
将两个文件复制到cloud02中:
scp -r ~/Streamingtext cloud02:~/
scp -r ~/StreamingNet.jar cloud02:~/
打开cloud02的终端输入:
java -cp StreamingNet.jar ling.SocketSimulation ~/Streamingtext/file1.txt 9999 1000
在cloud01的终端中输入:
cd ~/spark-1.4.0-bin-hadoop2.4
bin/spark-submit ~/NetworkCluster.jar cloud02 9999
package dong.spark import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext} import org.apache.spark.streaming.StreamingCont object StatefulWordCount { def main(args:Array[String]): Unit ={ val updateFunc=(values: Seq[Int],state:Option[Int])=>{ val currentCount=values.foldLeft(0)(_+_) val previousCount=state.getOrElse(0) Some(currentCount+previousCount) } val conf=new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]") val sc=new SparkContext(conf) val ssc=new StreamingContext(sc, Seconds(5)) ssc.checkpoint(".")//使用updateStateByKey之前一定要checkpoint,这条语句一定要有,否则执行不通。 val lines=ssc.socketTextStream(args(0),args(1).toInt) val words=lines.flatMap(_.split(",")) val wordcounts=words.map(x=>(x,1)) val stateDstream=wordcounts.updateStateByKey[Int](updateFunc) stateDstream.print() ssc.start() ssc.awaitTermination() } } /* 需要设置应用参数run->添加application->添加参数 cloud01 9999 */ ****************数据源************** hello, hbase, spark, world,
package dong.spark import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel object WindowWordCount { def main(args:Array[String]) ={ val conf=new SparkConf().setAppName("WindowWordCount")setMaster("local[2]") val sc=new SparkContext(conf) val ssc=new StreamingContext(sc, Seconds(5)) ssc.checkpoint(".") val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY_SER) val words= lines.flatMap(_.split(",")) val wordcounts=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(args(2).toInt),Seconds(args(3).toInt)) wordcounts.print() ssc.start() ssc.awaitTermination() } } *************数据源************* hello, hbase, world, spark,
spark-Day4