首页 > 代码库 > <Spark><Spark Streaming>

<Spark><Spark Streaming>

Overview

  • Spark Streaming为用户提供了一套与batch jobs十分相似的API,以编写streaming应用
  • 与Spark的基本概念RDDs类似,Spark Streaming提供了被称为DStreams/discretized streams的抽象
  • DStream is a sequence of data arriving over time. 其本质是,每个DStream被表示成来自每个时间阶段的RDDs的序列,因此被称为离散的
  • DStreams可以从各种输入数据源创建,比如Flume, Kafka, or HDFS
  • 一旦创建,DStream提供两种类型的操作:[DStreams提供很多与RDD上类型的操作,同时还有新的关于时间的操作,比如sliding-windows]
    • transformations: which yield a new DStreams
    • ouput operations: which write data to an external system
  • 与批处理不同的是,Spark Streaming还需要额外的setup来operate 24/7。稍后我们会讨论checkpointing: the main mechanism Spark Streaming provides for this purpose, which lets it store data in a reliable file system such as HDFS.稍后我们会讨论如何在失败时重启,或者如何自动重启。

A Simple Example

  • 首先要创建一个StreamingContext。这也在下层创建了一个SparkContext用来处理数据。这其中我们会指定一个batch interval作为输入来确定处理新数据的频率。
  • 然后使用socketTextStreams()来创建一个基于文本数据的DStream
// Create a Streming filter for printing lines containing "error" in Scala
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream using data received after connecting to port 777 on the local machine
val lines = scc.socketTextStream("localhost", 7777)
// Filter our DStreams for lines with "error"
val errorLines = lines.filter(_.contains("error"))
//Print out the lines with errors
errorLines.print()
  • 以上程序只有在系统接收到数据的时候才会开始计算。为此,我们必须显式地调用ssc.start()。然后SparkStreaming会基于底层的SparkContext开始调度这个Spark jobs。这些是在一个单独的线程发生的。为了防止程序退出,我们还需要调用awaitTermination来等待Streaming计算完成。
// Start our streaming context and wait for it to "finish"
ssc.start()
// Wait for the job to finish
ssc.awaitTermination()
  •  注意一个streaming context只能被start一次,并且必须在所有DStreams设置好了之后启动

Architecture and Abstraction

  • Spark Streaming使用“micro-batch”架构,streaming计算被当做一些列连续的batch computations on small batches of data.
  • Spark Streaming从多种数据源接收数据并把他们分成small batches。新的batches在固定时间间隔内被创建。
  • 每个输入batch组成一个RDD,并通过Spark jobs来处理,被处理好的结果可以被pushed out到外部系统。

技术分享

  • 容错:和RDDs类似,只要输入数据的一个copy有效,那么它可以通过RDDs的lineage重计算。缺省情况下,received data在两个nodes上都有副本,所以SparkStreaming可以容忍single worker failures。但是如果仅仅使用lineage,那么recomputation可能会耗费很多时间。因此,SparkStreaming提供了checkpointing机制来周期地存储状态到reliable文件系统(HDFS or S3)。典型地,你可以每5-10个batches的数据创建一个checkpointing。

Transformations

  • DStreams上的Transformations可以分为stateless和stateful两类:
    • 对stateless transformations,对每个batch的处理不依赖于之间的batches数据。这些包括了普通的RDD transformations:map(), filter(), reduceByKey()
    • Stateful transformations, 相比之下会使用之前batches的数据或中间结果来得到当前batch的结果。这包括了一些基于sliding window和时间状态追踪的transformations。

Stateless Transformations

  • 无状态的Transformations只是简单地被应用到每个batch--也就是DStream中的每个RDD。
  • eg:在如下的log processing程序中,我们使用map()和reduceByKey()来对每个时间阶段统计log events  
// Assumes ApacheAccessLog is utility class for parsing entries from Apache logs
val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line))
val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1))
val ipCountsDStream = ipDStream.reduceByKey((x, y) => x +y)

 Stateful Transformations

  • 有状态的transformations主要有两种:
    • windowed operations:which act over a sliding window of time periods
    • updateStateByKey(): which is used to track state across events for each key(eg: to build up an object representing each user session.)
  • 有状态的transformation需要开启checkpointing来容错。

Windowed transformation

  • 基于窗口的操作所处理的时间段长于StreamingContext的batch interval --> 通过合并多个batches的结果。
  • 所有基于窗口的操作需要两个参数:【这两个值都要是batch interval的倍数
    • window duration:控制考虑多少个previous batches of data
    • sliding duration: 控制一个新的DStream计算结果的频率

技术分享

UpdateStateByKey transformation

  • 有时候需要在一个DStream的batches之间维护一个状态
  • updateStateByKey()提供一个访问DStreams状态变量的键值对。
  • eg:对于日志文件,我们可以追踪每个userID(key)的最后访问的10个页面。也就是说这最后10个页面组成的list使我们的"state"对象,我们在每次事件到达的时候update。
  • 使用方式:调用update(events, oldState),返回newState。【
    • events: a list of events that arrived in the current batch(may be empty)
    • oldState: an optional state object, stored within an Option; it might be missing if there was no previous state for the key.
    • newState: 函数返回值,也是一个Option。我们可以返回一个空Option,来表示我们要删除该状态。
  • eg:key是HTTP response code, state是integer表示count,events是page views。这里我们保留的是从程序开始的所有count。
def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
    Some(State.getOrElse(0L) + values.size)
}

val responseCodeDSrteam = accessLogsDStream.map(log => (log.getResponseCode(), 1L))
val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)

Output Operations

ipAddressRequeseCount.foreach { rdd =>
    rdd.foreachPartition { partition =>
        // Open connection to storage system(e.g. a database connection)
        partition.foreach{ item =>
            // Use connection to push item to system
        }
       // Close connection
    }
}

 Input Sources

Core Sources

  • 可以从core sources创建DStream的方法are all available on the StreamingContext.

Stream of files

  • 因为Spark支持从任意Hadoop-compatible文件系统中读取,所以Spark Streaming天然地允许从这些FS中创建stream。这种情况很常见,因为它支持variety of backends,尤其是我们会赋值到HDFS的log data。
  • TBD...

Akka actor stream

  • TBD...

Additional Sources

  • 包括Twitter,Apache Kafka,Amazon Kinesis, Apache Flume以及ZeroMQ

Apache Kakfa

  • TBD...

Apache Flume

  • Spark对Flume有两种不同的receiver:
    • Push-based receiver:the receiver acts as Avro sink that Flume pushes data to.
    • Pull-based receiver: the receiver can pull data from an intermediate custom sink, to which other processes are pushing data with Flume.

技术分享

Push-based receiver

  • 这种push-based方法可以快速安装,但是没有使用事务来接收数据
  • 缺点很明显:就是没有使用事务,那么很有可能在worker node失效的时候丢失数据。并且如果配置了receiver的worker彻底fail掉的话,系统需要试图在另一个位置launch a receiver,这种配置是十分困难的。

Pull-based receiver

  • 这种方法is preferred for resiliency(弹性),并且数据保存在sink直到SparkStreaming读取并复制掉。整个过程会用事务来维护
  • TBD...

Multiple Sources and Cluster Sizing

  • 我们可以使用union()方法来combine多个DStreams的数据
  • TBD...

24/7 Operation

  • Spark Streaming的一个主要优点就是它提供了很强的容错保证
  • 只要输入数据stored reliably,Spark Streaming总可以计算得到正确的结果 --> 即使有worker or dirver fail掉,也会像没有nodes fail一样得到结果。
  • 为了使Spark Streaming 24/7, 你需要进行一些特殊的setup。
    • 第一步就是setting up checkpointing to a reliable storage system, such as HDFS or Amazon S3.
    • 除此之外,你还要考虑driver program的容错问题,以及输入源的unreliable。

Checkpointing

  • checkpointing是Spark Streaming最主要的容错机制。
  • 它允许Spark Streaming周期地将数据存储到可信赖的存储系统。
  • 具体地,checkpointing有两个目的:
    • 减少在失败时的recompute。像之前介绍的,Streaming会使用transformation的lineage图来recompute,但是checkpointing可以控制它必须要go back多远
    • 为driver提供容错。如果driver program crashes,你可以再启动它,并且告诉它去从一个checkpoint恢复。这样Spark Streaming就会读取该program已经处理了多少了,并且从这里继续。
    ssc.chepoint("hdfs://...")
  • 注: 在local模式你可以使用本地文件系统,但是对于生产环境,你需要使用a replicated system, 比如HDFS,S3,NFS filer。 

 

<Spark><Spark Streaming>