首页 > 代码库 > <Spark Streaming><Flume><Integration>

<Spark Streaming><Flume><Integration>

Overview

  • Flume:一个分布式的,可靠的,可用的服务,用于有效地收集、聚合、移动大规模日志数据
  • 我们搭建一个flume + Spark Streaming的平台来从Flume获取数据,并处理它。
  • 有两种方法实现:使用flume-style的push-based方法,或者使用自定义的sink来实现pull-based方法。

Approach 1: Flume-style Push-based Approach

  • flume被设计用来在Flume agents之间推信息,在这种方式下,Spark Streaming安装一个receiver that acts like an Avro agent for Flume, to which Flume can push the data.

General Requirement

  • 当你启动flume + spark streaming应用时,该机器上必须运行一个Spark workers
  • flume可以向该机器的某一个port push数据。
  • 基于这种push机制,streaming应用必须有一个receiver scheduled and listening on the chosen port.

Configuring Flume

  • 配置flume以向Avro sink发送数据
技术分享
agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine‘s hostname>
agent.sinks.avroSink.port = <chosen port on the machine>
View Code

Configuring Spark Streaming Application

  1. Linking: 在maven项目中配置依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume-sink_2.10</artifactId>
    <version>2.1.0</version>
</dependency>

  2. Programming:import FlumeUtils, 创建input DStream

 import org.apache.spark.streaming.flume._

 val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machines hostname], [chosen port])
  • 注意:应该与cluster中的resourceManager使用同一个hostname,这样的话资源分配可以匹配names,并在正确的机器上launch receiver
  • 一个简单的Spark Streaming统计Flume event个数的demo代码:
技术分享
object FlumeEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(host, IntParam(port)) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
View Code

 

<Spark Streaming><Flume><Integration>