首页 > 代码库 > sparkstreaming+kafka

sparkstreaming+kafka

生产者

import java.util.HashMap
import org.apache.kafka.clients.producer._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object spark_kafka_wordcount_producer {
    def main(args: Array[String]) {
       val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3")
       val props = new HashMap[String, Object]()
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")

      val producer = new KafkaProducer[String, String](props)

      while(true) {
          val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
                .mkString(" ")
          val message = new ProducerRecord[String, String](topic, null, str)
          producer.send(message)
  
          Thread.sleep(1000)
      }
    }
}

 消费者

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object spark_kafka_wordcount_customer {
   def main(args: Array[String]) {
        val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first")
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc =  new StreamingContext(sparkConf, Seconds(1))
        ssc.checkpoint("checkpoint")

        val topicpMap = topics.split(",").map((_,2)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

 

sparkstreaming+kafka