首页 > 代码库 > Flume概述

Flume概述

常见的开源数据收集系统有:

非结构数据(日志)收集

  Flume

结构化数据收集(传统数据库与 Hadoop 同步)

  Sqoop:全量导入

  Canal(alibaba):增量导入

  Databus(linkedin):增量导入

Flume是什么:

由Cloudera公司开源

分布式、可靠、高可用的海量日志采集系统

数据源可定制,可扩展

数据存储系统可定制,可扩展

中间件:屏蔽了数据源和数据存储系统的异构性

Flume的两个版本

技术分享

OG版本因为有一个巨大的BUG,所以进行升级改造,产出NG版本

Flume NG的基本架构

技术分享

一个Agent就代表一个Flume

Flume 有三个组件,Sourc、Channel、Sink,分别对应了Producer、Buffer、Customer

Flume NG的核心概念

Event

  Event是Flume数据传输的基本单元,Event由可选的header和载有数据的一个byte array构成,载有的数据对flume是不透明的,Flume 不关心数据是什么样子的,Header是容纳了key-value字符串对的无序集合,key在集合内是唯一的

Client

  Client 一般不需要配置在 Flume 中,只需要配置 Agent 即可,Client是一个将原始log包装成events并且发送它们到一个或多个agent的实体,目的是从数据源系统中解耦Flume,在flume的拓扑结构中不是必须的。

  Client的实例可以是 Flume log4j 的 Appender,或者可以使用Client SDK (org.apache.flume.api)定制特定的Client

Agent

  一个Agent包含Source, Channel, Sink和其他组件,它利用这些组件将events从一个节点传输到另一个节点或最终目的,agent是flume流的基础部分

技术分享

  Source 将数据写入到 Channel 的尾部,然后 Sink 从 Channel 中获取头部 Event

  Agent.Source

  Source 负责接收日志数据,并将数据包装成Event,并将events批量的放到一个或多个Channel

  不同类型的Source:

    1、与系统集成的Source: Syslog, Netcat

    2、自动生成事件的Source: Exec

        可执行任意Unix命令,无容错性,比如将 tail -f 的标准输出作为输入源,但是有问题,因为再次运行 tail -f 不一定会接着上次的

        a1.sources = r1
        a1.channels = c1
        a1.sources.r1.type = exec
        a1.sources.r1.command = tail -F /var/log/secure
        a1.sources.r1.channels = c1

    3、监听文件夹下文件变化:

      3.1、Spooling Directory Source(是暴露出来的,能直接使用的)

        监听一个文件夹下新产生的文件,并读取内容,发至 channel

        已经产生的文件不能进行任意修改,不然会停止处理

        建议将文件(唯一文件名)写到一个临时目录下,之后move到监听目录下

        技术分享

        a1.channels = ch-1
        a1.sources = src-1
        a1.sources.src-1.type = spooldir
        a1.sources.src-1.channels = ch-1
        a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
        a1.sources.src-1.fileHeader = true

      3.2、Taildir Source(没有暴露出来,不可以直接使用)

        监听文件内容,一旦新写入一行新数据,则读取之

        支持断点续读,定期将最新读取数据的偏移量写入json 文件

        根据文件修改时间决定读取优先级,最新的文件优先读取

        读取完的文件不会做任何处理(比如删除,重命名等)

        目前仅支持文本文件

技术分享        a1.sources = r1
        a1.channels = c1
        a1.sources.r1.type = TAILDIR
        a1.sources.r1.channels = c1
        a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
        a1.sources.r1.filegroups = f1 f2
        a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
        a1.sources.r1.headers.f1.headerKey1 = value1
        a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
        a1.sources.r1.headers.f2.headerKey1 = value2
        a1.sources.r1.headers.f2.headerKey2 = value2-2
        a1.sources.r1.fileHeader = true

    4、用于Agent和Agent之间通信的RPC Source: Avro、Thrift,下面以Avro 为例

技术分享

        a1.sources = r1
        a1.channels = c1
        a1.sources.r1.type = avro
        a1.sources.r1.channels = c1
        a1.sources.r1.bind = 0.0.0.0
        a1.sources.r1.port = 4141

      5、Source必须至少和一个channel关联

  Agent.Channel

    Channel位于Source和Sink之间,用于缓存event

    当Sink成功将event发送到下一跳的channel或最终目的,event从Channel移除

    不同的Channel提供的持久化水平也是不一样的

      1、Memory Channel: volatile,基于内存的这种方式优点肯定是快,缺点是可能会丢失

技术分享        a1.channels = c1
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 10000
        a1.channels.c1.byteCapacityBufferPercentage = 20
        a1.channels.c1.byteCapacity = 800000

      2、File Channel: 基于WAL(预写式日志Write-Ahead Logging)实现,缺点就是慢,优点是不会丢失,和memory权衡使用吧

技术分享        a1.channels = c1
        a1.channels.c1.type = file
        a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
        a1.channels.c1.dataDirs = /mnt/flume/data

      3、JDBC Channel: 基于嵌入Database实现

    Channel支持事务,提供较弱的顺序保证

    可以和任何数量的Source和Sink工作

   Agent.Sink

    Sink负责将event传输到下一跳或最终目的,成功完成后将event从channel移除。

    不同类型的Sink:

      存储event到最终目的的终端Sink. 比如: HDFS, HBase

      自动消耗的Sink. 比如: Null Sink,生产环境中很少用,主要用于测试环境

      用于Agent间通信的RPC sink: Avro

    必须作用于一个确切的channel

Flume概述