首页 > 代码库 > Spark整合kafka0.10.0新特性(二)

Spark整合kafka0.10.0新特性(二)

接着Spark整合kafka0.10.0新特性(一)开始


import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

分析完位置策略和消费策略,接下来先看看org.apache.spark.streaming.kafka010.KafkaUtils$#createDirectStream的具体实现:


  @Experimental
  def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]] = {									
    val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
    createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
  }

返回的是InputDStream[ConsumerRecord[K,V]]类型,查看一下ConsumerRecord类型:

/**
 * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
 * record is being received and an offset that points to the record in a Kafka partition.
 * 从Kafka接受到的消息对key/value,包含topic名字、分区编号、以及消息在分区的offset
 */
public final class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final long checksum;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final K key;
    private final V value;
		
	等等省略
}


关于InputDStream具体细节略,看一下类继承结构:

技术分享


所以createDirectStream返回的具体类型是DirectKafkaInputDStream。

接着在createDirectStream中创建DefaultPerPartitionConfig,DefaultPerPartitionConfig就是一个设置每一个分区获取消息的组大数率,设置参数为spark.streaming.kafka.maxRatePerPartition.源码如下:


package org.apache.spark.streaming.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkConf
import org.apache.spark.annotation.Experimental

/**
 * :: Experimental ::
 * Interface for user-supplied configurations that can‘t otherwise be set via Spark properties,
 * because they need tweaking on a per-partition basis,
 * 
 * 为用户提供的一个配置接口,但是这些参数不可以使用spark配置文件进行配置,因为spark配置文件配置,因为他们需要
 * 对每一个分区的比率进行调整。可以使用SparkConf进行设置数率
 */
@Experimental
abstract class PerPartitionConfig extends Serializable {
  /**
   *  Maximum rate (number of records per second) at which data will be read
   *  from each Kafka partition.
   *
   *从Kafka分区中读取数据的最大比率(每秒最大记录数)
   */
  def maxRatePerPartition(topicPartition: TopicPartition): Long
}

/**
 * Default per-partition configuration
 */
private class DefaultPerPartitionConfig(conf: SparkConf)
    extends PerPartitionConfig {
  val maxRate = conf.getLong("spark.streaming.kafka.maxRatePerPartition", 0)
  //从Kafka分区中读取数据的最大比率(每秒最大记录数)
  def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
}


创建完毕PerPartitionConfig之后再次调用createDirectStream的重载方法:

  def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V],
      perPartitionConfig: PerPartitionConfig
    ): InputDStream[ConsumerRecord[K, V]] = {
    new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
  }

接下来重点查看DirectKafkaInputDStream的构造器(注意:Scala类的构造器是从类定义的左{开始到右}结束都是主构造器):


package org.apache.spark.streaming.kafka010

import java.{util => ju}
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator

/**
  *
  * each given Kafka topic/partition corresponds to an RDD partition.
  * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
  * of messages
  * per second that each ‘‘‘partition‘‘‘ will accept.
  *
  * 每个topic的每一个分区对应一个RDD分区
  * spark的spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数
  *
  * @param locationStrategy    In most cases, pass in [[PreferConsistent]],
  *                            see [[LocationStrategy]] for more details.
  * @param executorKafkaParams Kafka
  *                            <a href=http://www.mamicode.com/"http://kafka.apache.org/documentation.html#newconsumerconfigs">>

完成DirectKafkaInputDStream的创建,到此处都是transformation过程,此时还没有遇到action算子开始执行。

其中DirectKafkaInputDStream的compute方法就是有StreamingContext的start方法间接最终调用的。
  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    val rdd = new KafkaRDD[K, V](
      context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)//重点方法,位置策略

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter { offsetRange =>
      // Don‘t display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = http://www.mamicode.com/Map(>

关于位置策略的看的还不是很透彻,改天再研究一下:org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#getPreferredHosts方法。

补充:Streaming作业的产生和执行都是由StreamingContext的start方法开始的。







Spark整合kafka0.10.0新特性(二)