首页 > 代码库 > Spark整合kafka0.10.0新特性(二)
Spark整合kafka0.10.0新特性(二)
接着Spark整合kafka0.10.0新特性(一)开始
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
分析完位置策略和消费策略,接下来先看看org.apache.spark.streaming.kafka010.KafkaUtils$#createDirectStream的具体实现:
@Experimental
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]] = {
val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
}
返回的是InputDStream[ConsumerRecord[K,V]]类型,查看一下ConsumerRecord类型:
/** * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the * record is being received and an offset that points to the record in a Kafka partition. * 从Kafka接受到的消息对key/value,包含topic名字、分区编号、以及消息在分区的offset */ public final class ConsumerRecord<K, V> { public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP; public static final int NULL_SIZE = -1; public static final int NULL_CHECKSUM = -1; private final String topic; private final int partition; private final long offset; private final long timestamp; private final TimestampType timestampType; private final long checksum; private final int serializedKeySize; private final int serializedValueSize; private final K key; private final V value; 等等省略 }
关于InputDStream具体细节略,看一下类继承结构:
所以createDirectStream返回的具体类型是DirectKafkaInputDStream。
接着在createDirectStream中创建DefaultPerPartitionConfig,DefaultPerPartitionConfig就是一个设置每一个分区获取消息的组大数率,设置参数为spark.streaming.kafka.maxRatePerPartition.源码如下:
package org.apache.spark.streaming.kafka010 import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.annotation.Experimental /** * :: Experimental :: * Interface for user-supplied configurations that can‘t otherwise be set via Spark properties, * because they need tweaking on a per-partition basis, * * 为用户提供的一个配置接口,但是这些参数不可以使用spark配置文件进行配置,因为spark配置文件配置,因为他们需要 * 对每一个分区的比率进行调整。可以使用SparkConf进行设置数率 */ @Experimental abstract class PerPartitionConfig extends Serializable { /** * Maximum rate (number of records per second) at which data will be read * from each Kafka partition. * *从Kafka分区中读取数据的最大比率(每秒最大记录数) */ def maxRatePerPartition(topicPartition: TopicPartition): Long } /** * Default per-partition configuration */ private class DefaultPerPartitionConfig(conf: SparkConf) extends PerPartitionConfig { val maxRate = conf.getLong("spark.streaming.kafka.maxRatePerPartition", 0) //从Kafka分区中读取数据的最大比率(每秒最大记录数) def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate }
创建完毕PerPartitionConfig之后再次调用createDirectStream的重载方法:
def createDirectStream[K, V]( ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V], perPartitionConfig: PerPartitionConfig ): InputDStream[ConsumerRecord[K, V]] = { new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig) }
接下来重点查看DirectKafkaInputDStream的构造器(注意:Scala类的构造器是从类定义的左{开始到右}结束都是主构造器):
package org.apache.spark.streaming.kafka010 import java.{util => ju} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} import org.apache.spark.streaming.scheduler.rate.RateEstimator /** * * each given Kafka topic/partition corresponds to an RDD partition. * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number * of messages * per second that each ‘‘‘partition‘‘‘ will accept. * * 每个topic的每一个分区对应一个RDD分区 * spark的spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数 * * @param locationStrategy In most cases, pass in [[PreferConsistent]], * see [[LocationStrategy]] for more details. * @param executorKafkaParams Kafka * <a href=http://www.mamicode.com/"http://kafka.apache.org/documentation.html#newconsumerconfigs">>
完成DirectKafkaInputDStream的创建,到此处都是transformation过程,此时还没有遇到action算子开始执行。
其中DirectKafkaInputDStream的compute方法就是有StreamingContext的start方法间接最终调用的。override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { val untilOffsets = clamp(latestOffsets()) val offsetRanges = untilOffsets.map { case (tp, uo) => val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) } val rdd = new KafkaRDD[K, V]( context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)//重点方法,位置策略 // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => // Don‘t display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = http://www.mamicode.com/Map(>关于位置策略的看的还不是很透彻,改天再研究一下:org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#getPreferredHosts方法。
补充:Streaming作业的产生和执行都是由StreamingContext的start方法开始的。
Spark整合kafka0.10.0新特性(二)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。