首页 > 代码库 > spark streaming kafka1.4.1中的低阶api createDirectStream使用总结

spark streaming kafka1.4.1中的低阶api createDirectStream使用总结

转载:http://blog.csdn.net/ligt0610/article/details/47311771

       由于目前每天需要从kafka中消费20亿条左右的消息,集群压力有点大,会导致job不同程度的异常退出。原来使用spark1.1.0版本中的createStream函数,但是在数据处理速度跟不上数据消费速度且job异常退出的情况下,可能造成大量的数据丢失。幸好,Spark后续版本对这一情况有了很大的改进,1.2版本加入WAL特性,但是性能应该会受到一些影响(本人未测试),1.3版本可以直接通过低阶API从kafka的topic消费消息,并且不再向zookeeper中更新consumer offsets,使得基于zookeeper的consumer offsets的监控工具都会失效。

       官方只是非常简单的描述了可以用以下方法修改zookeeper中的consumer offsets(可以查看http://spark.apache.org/docs/1.4.1/streaming-kafka-integration.html):

    // Hold a reference to the current offset ranges, so it can be used downstream  
     var offsetRanges = Array[OffsetRange]()  
          
     directKafkaStream.transform { rdd =>  
       offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  
       rdd  
     }.map {  
               ...  
     }.foreachRDD { rdd =>  
       for (o <- offsetRanges) {  
         println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")  
       }  
       ...  
     }  

 所以更新zookeeper中的consumer offsets还需要自己去实现,并且官方提供的两个createDirectStream重载并不能很好的满足我的需求,需要进一步封装。具体看以下KafkaManager类的代码:

    package org.apache.spark.streaming.kafka  
      
    import kafka.common.TopicAndPartition  
    import kafka.message.MessageAndMetadata  
    import kafka.serializer.Decoder  
    import org.apache.spark.SparkException  
    import org.apache.spark.rdd.RDD  
    import org.apache.spark.streaming.StreamingContext  
    import org.apache.spark.streaming.dstream.InputDStream  
    import org.apache.spark.streaming.kafka.KafkaCluster.{LeaderOffset}  
      
    import scala.reflect.ClassTag  
      
    /** 
     * Created by knowpigxia on 15-8-5. 
     */  
    class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {  
      
      private val kc = new KafkaCluster(kafkaParams)  
      
      /** 
       * 创建数据流 
       * @param ssc 
       * @param kafkaParams 
       * @param topics 
       * @tparam K 
       * @tparam V 
       * @tparam KD 
       * @tparam VD 
       * @return 
       */  
      def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](  
                                                                                                                ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] =  {  
        val groupId = kafkaParams.get("group.id").get  
        // 在zookeeper上读取offsets前先根据实际情况更新offsets  
        setOrUpdateOffsets(topics, groupId)  
      
        //从zookeeper上读取offset开始消费message  
        val messages = {  
          val partitionsE = kc.getPartitions(topics)  
          if (partitionsE.isLeft)  
            throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")  
          val partitions = partitionsE.right.get  
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)  
          if (consumerOffsetsE.isLeft)  
            throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")  
          val consumerOffsets = consumerOffsetsE.right.get  
          KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](  
            ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))  
        }  
        messages  
      }  
      
      /** 
       * 创建数据流前,根据实际消费情况更新消费offsets 
       * @param topics 
       * @param groupId 
       */  
      private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {  
        topics.foreach(topic => {  
          var hasConsumed = true  
          val partitionsE = kc.getPartitions(Set(topic))  
          if (partitionsE.isLeft)  
            throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")  
          val partitions = partitionsE.right.get  
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)  
          if (consumerOffsetsE.isLeft) hasConsumed = false  
          if (hasConsumed) {// 消费过  
            /** 
             * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException, 
             * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。 
             * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小, 
             * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时, 
             * 这时把consumerOffsets更新为earliestLeaderOffsets 
             */  
            val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)  
            if (earliestLeaderOffsetsE.isLeft)  
              throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")  
            val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get  
            val consumerOffsets = consumerOffsetsE.right.get  
      
            // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets  
            var offsets: Map[TopicAndPartition, Long] = Map()  
            consumerOffsets.foreach({ case(tp, n) =>  
              val earliestLeaderOffset = earliestLeaderOffsets(tp).offset  
              if (n < earliestLeaderOffset) {  
                println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +  
                  " offsets已经过时,更新为" + earliestLeaderOffset)  
                offsets += (tp -> earliestLeaderOffset)  
              }  
            })  
            if (!offsets.isEmpty) {  
              kc.setConsumerOffsets(groupId, offsets)  
            }  
          } else {// 没有消费过  
          val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)  
            var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null  
            if (reset == Some("smallest")) {  
              val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)  
              if (leaderOffsetsE.isLeft)  
                throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")  
              leaderOffsets = leaderOffsetsE.right.get  
            } else {  
              val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)  
              if (leaderOffsetsE.isLeft)  
                throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")  
              leaderOffsets = leaderOffsetsE.right.get  
            }  
            val offsets = leaderOffsets.map {  
              case (tp, offset) => (tp, offset.offset)  
            }  
            kc.setConsumerOffsets(groupId, offsets)  
          }  
        })  
      }  
      
      /** 
       * 更新zookeeper上的消费offsets 
       * @param rdd 
       */  
      def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {  
        val groupId = kafkaParams.get("group.id").get  
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  
      
        for (offsets <- offsetsList) {  
          val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)  
          val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))  
          if (o.isLeft) {  
            println(s"Error updating the offset to Kafka cluster: ${o.left.get}")  
          }  
        }  
      }  
    }  

   接下来再给一个简单的例子:

    import kafka.serializer.StringDecoder  
    import org.apache.log4j.{Level, Logger}  
    import org.apache.spark.SparkConf  
    import org.apache.spark.rdd.RDD  
    import org.apache.spark.streaming.kafka._  
    import org.apache.spark.streaming.{Seconds, StreamingContext}  
      
    /** 
     * Created by knowpigxia on 15-8-4. 
     */  
    object DirectKafkaWordCount {  
      
      def dealLine(line: String): String = {  
        val list = AnalysisUtil.dealString(line, ,, ")// 把dealString函数当做split即可  
        list.get(0).substring(0, 10) + "-" + list.get(26)  
      }  
      
      def processRdd(rdd: RDD[(String, String)]): Unit = {  
        val lines = rdd.map(_._2)  
        val words = lines.map(dealLine(_))  
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)  
        wordCounts.foreach(println)  
      }  
      
      def main(args: Array[String]) {  
        if (args.length < 3) {  
          System.err.println( s"""  
            |Usage: DirectKafkaWordCount <brokers> <topics> <groupid>  
            |  <brokers> is a list of one or more Kafka brokers  
            |  <topics> is a list of one or more kafka topics to consume from  
            |  <groupid> is a consume group  
            |  
            """.stripMargin)  
          System.exit(1)  
        }  
      
        Logger.getLogger("org").setLevel(Level.WARN)  
      
        val Array(brokers, topics, groupId) = args  
      
        // Create context with 2 second batch interval  
        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")  
        sparkConf.setMaster("local[*]")  
        sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")  
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  
      
        val ssc = new StreamingContext(sparkConf, Seconds(2))  
      
        // Create direct kafka stream with brokers and topics  
        val topicsSet = topics.split(",").toSet  
        val kafkaParams = Map[String, String](  
          "metadata.broker.list" -> brokers,  
          "group.id" -> groupId,  
          "auto.offset.reset" -> "smallest"  
        )  
      
        val km = new KafkaManager(kafkaParams)  
      
        val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](  
          ssc, kafkaParams, topicsSet)  
      
        messages.foreachRDD(rdd => {  
          if (!rdd.isEmpty()) {  
            // 先处理消息  
            processRdd(rdd)  
            // 再更新offsets  
            km.updateZKOffsets(rdd)  
          }  
        })  
      
        ssc.start()  
        ssc.awaitTermination()  
      }  
    }  

 

spark streaming kafka1.4.1中的低阶api createDirectStream使用总结