首页 > 代码库 > spark-Day4

spark-Day4

package ling

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object StreamingFileWordCount {
  def main(args: Array[String]): Unit ={
    val sparkConf =new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Seconds(30))

    val lines = ssc.textFileStream("/home/hduser/Streamingtext")
    val words= lines.flatMap(_.split(" "))
    val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_)
    wordcounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

今天是学习spark的第四天,学习如何使用Dstream

上一个代码是为了设置数据监控和文件的读取,只要在30秒内输入新的文件,监控器会读取输入的内容。

在此过程中如果import代码行加入如下代码,可以使监视器输出的内容只有读取的值没有日志(下面两行放在object里):

 import org.apache.log4j.{Level, Logger} 
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 
 
运行后再Streamingtext文件夹下新建文档并输入数据,监视器会有显示。
 
二、运行模拟器和分析器
1.本地运行(此时需要一个jar,并仔细设置运行情况)
模拟器代码如下:
package ling

import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source

object SocketSimulation {
  def index(length: Int)={
    import java.util.Random
    val rdm = new Random

    rdm.nextInt(length)
  }

  def main(args:Array[String]): Unit ={
    if(args.length!=3){
      System.err.println("Usage:<filename> <port> <millisecond>")
      System.exit(1)
    }

    val filename = args(0)
    val lines = Source.fromFile(filename).getLines().toList
    val filerow=lines.length

    val listener =new ServerSocket(args(1).toInt)
    while(true){
      val socket = listener.accept()
      new Thread(){
        override def run={
          println("Got client connected from:"+socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(),true)
          while(true){
            Thread.sleep(args(2).toLong)
            val content= lines(index(filerow))
            println(content)
            out.write(content+‘\n‘)
            out.flush()
          }
          socket.close()
        }
      }.start()

    }

  }

}
现将模拟器打包:创建一个StreamingNet工程,按步骤创建SocketSimulation的object
点选file-》project structure-》Artifacts-》+-》mianclass-》SocketSimulation-》aplly-》注意下拉菜单的class path手动数日如下代码:

/home/hduser/scala-2.10.4/lib/scala-swing.jar
/home/hduser/scala-2.10.4/lib/scala-library.jar
/home/hduser/scala-2.10.4/lib/scala-actors.jar

然后点ok,再Build一下,然后Run里的edit configuration中program argument 输入cloud01 9999这是输入运行的地址

这样一个模拟器打包完成

再在同一个工程里一个object文件输入如下代码:

package ling

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object NetworkWordCount {

  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

  def main (args:Array[String]) ={
    val conf=new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val ssc=new StreamingContext(sc, Seconds(5))

    val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
    val words= lines.flatMap(_.split(","))
    val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_)

    wordcounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

本地主文件夹的streamingtext文件夹下有一个file1.txt

运行NetworkWordCount

然后打开终端:

java -cp StreamingNet.jar ling.SocketSimulation /home/hduser/Streamingtext/file1.txt 9999 1000

就会发现idea和终端同时运行。

 

2.模拟网络运行(此时需要两个jar包)
新建一个工程NetworkWordCount
 
上面的两个代码都还有用
NetworkWordCount中需要改处地址:
setMaster("local[2]")改成setMaster("spark://192.168.136.129:7077")自己的地址

改好后将此工程打包,按普通方式打包,不必添加上述的jar。
将两个文件复制到cloud02中:
scp -r ~/Streamingtext cloud02:~/
scp -r ~/StreamingNet.jar cloud02:~/

打开cloud02的终端输入:

java -cp StreamingNet.jar ling.SocketSimulation ~/Streamingtext/file1.txt 9999 1000

在cloud01的终端中输入:

cd ~/spark-1.4.0-bin-hadoop2.4

bin/spark-submit ~/NetworkCluster.jar cloud02 9999

 

 

 

 



 
下面的两个例子可以按照上述方法操作:
package dong.spark

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingCont


object StatefulWordCount {
  def main(args:Array[String]): Unit ={

    val updateFunc=(values: Seq[Int],state:Option[Int])=>{
      val currentCount=values.foldLeft(0)(_+_)
      val previousCount=state.getOrElse(0)
      Some(currentCount+previousCount)
    }
    val conf=new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val ssc=new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(".")//使用updateStateByKey之前一定要checkpoint,这条语句一定要有,否则执行不通。

    val lines=ssc.socketTextStream(args(0),args(1).toInt)

    val words=lines.flatMap(_.split(","))
    val wordcounts=words.map(x=>(x,1))
    val stateDstream=wordcounts.updateStateByKey[Int](updateFunc)

    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()


  }

}

/*
需要设置应用参数run->添加application->添加参数
cloud01 9999
*/

****************数据源**************
hello,
hbase,
spark,
world,
package dong.spark

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object WindowWordCount {
  def main(args:Array[String]) ={
    val conf=new SparkConf().setAppName("WindowWordCount")setMaster("local[2]")
    val sc=new SparkContext(conf)
    val ssc=new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(".")

    val lines=ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY_SER)
    val words= lines.flatMap(_.split(","))

    val wordcounts=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(args(2).toInt),Seconds(args(3).toInt))

    wordcounts.print()
    ssc.start()
    ssc.awaitTermination()

  }
}
*************数据源*************
hello,
hbase,
world,
spark,

 

 

spark-Day4