首页 > 代码库 > flume的基本概念,数据流模型

flume的基本概念,数据流模型

1.flume的基本概念

本文中所有与flume相关术语都采用斜体英文表示,这些术语的含义如下所示。

flume             一个可靠的,分布式的,用于采集,聚合,传输海量日志数据的系统。

Web Server   一个产生 Events 的系统。

Agent            flume 系统中的一个节点,它主要包含三个部件:SourceChannelSink

Event            事件,在 flume-agent 内部传输的数据结构。一个 Event 由 Map<String, String>Headers 和 byte[] body 组成,其中 Headers 保存了 Event 的属性,body 保存了 Event 的内容。

Source         Agent Source 用来接收 WebServer 产生的 Events,以及其他 flume-agent 中的 Sink 产生的 Events。

Channel       Source 将 Events 放在 Channel 中保存,Channel 主要有两种,是 MemoryChannel 和 FileChannel,分别将 Events 存放在内存中和文件中。

Sink              Sink 用来消费 Channel 内保存的 Events,然后将 Events 发送出去。

Sinkgroups   将多个 Sink 组合在一起,形成 Sinkgroups

HDFS           Hadoop分布式文件系统,它用来存储日志数据,也就是 Sinks 发送出来的 Events

2.flume的数据流模型

(1) 单 Agent 数据流模型

如下图1所示,单个 Agent 主要包括三个部件:SourceChannelSink。


  图1  单Agent的数据流模型

整个数据流如下:

Web Server       产生 Events,并将 Events 发送到 Source 中。

Source              接收 Events,并将 Events 发送到 Channel 中。

Channel            存储 Events

Sink                   消费Channel 中存储的 Events,并将 Events 发送到 HDFS

HDFS                磁盘存储 Events

(2)多 Agent 串行传输数据流模型

如图2所示,两个 Agent 组成的数据流传输模型。

                                                           图2: 两个 Agent 串行传输数据流模型

整个数据流如下:

Agent foo     Agent foo 中的 Source 接收外部 Events,存储到 Channel 中,Sink 从 Channel 中获取 Events,再将 Events 传输到 Agent bar 的 Source 中。

Agent bar    Agent bar 中的 Source 接收 Agent foo 的 Sink 发送的 Events,存储到 bar Channel,再由 bar Sink消费。

整个数据流只做了一件事,就是传输数据。

(3)收集数据流模型

如图3所示,Agent1Agent2Agent3负责从不同的 Web Server 中接收 Events,并将 Events 发送到 Agent4Agent4 再将 Events 发送到 HDFS

                                                                图3:收集数据流模型

整个数据流如下:

Agent1      接收 Events,并将 Events 传输到Agent4

Agent2      接收 Events,并将 Events 传输到Agent4

Agent3      接收 Events,并将 Events 传输到Agent4

Agent4      接收 Agent1Agent2Agent3 的 Events,然后将 Events 存储到 HDFS

整个数据流完成的功能:不同的 Agent 收集不同的 Web Server 产生的日志数据,并将所有的日志数据存储到一个目的地 HDFS

(4)多路数据流模型

一个 Agent 中可以由一个 Source ,多个 Channels ,多个 Sinks 组成多路数据流,其多路数据流模型如下图4所示。

一个 Source 接收外部 Events,并将 Events 发送到三路 Channel 中去,然后不同的 Sink 消费不同的 Channel 内的 Events ,再将 Events 进行不同的处理。

Source 如何将 Events 发送到不同的 Channel 中?这里 flume 采用了两种不同的策略,是 replicating 和 multiplexing 。

其中 replicating 是 Source 将每个 Event 都发送到 Channel 中,这样就将 Events 复制成 3 份发到不同的地方去。

其中 multiplexing 是 Source 根据一些映射关系,将不同种类的 Event 发送到不同的 Channel 中去,即将所有 Events 分成3份,分别发送到三个 Channels

                                                     图4 多路数据流模型

整个数据流如下:

Agent foo         Source 将接收到的 Events 发送到 Channel1--Sink1--HDFSChannel2--Sink2--JMSChannel3--Sink3--Agent bar

Agent bar         Source 接收 Agent foo 中 Sink3 发送的 Events,然后发送到 Channel4--Sink

(5) Sinkgroups数据流模型

现在考虑这样两个问题,一是 某 Sink 负责消费某 Channel 中的 Events,那么如果该 Sink 挂掉之后, 该 Channel 则会堵死。

二是 某 Sink 负责消费某 Channel 中的 Events,那么如果该 Sink 速度慢,或者该 Sink 的消费能力达不到 Source 的接收能力呢? 大量的 Events 会在 Channel 中堆积,造成堵塞。

为了解决这两种情况,flume 中存在一种数据流模型,将多个 Sinks 绑定在一起,形成 Sinkgroups,它们共同负责消费某个 Channel 内的 Events

但是在某个时刻,只有一个 Sink 消费 Channel 内的 Events,于是有两种策略保证从 Sinkgroups 中选择出一个 Sink 来消费 Channel 中的 Events

这两种策略分别是:failover 和 load_balance。其中 failover 机制,会将所有 Sinks 标识一个优先级,一个以优先级为序的 Map 保存着 活着的 Sink,一个队列保存着 失败的 Sink

每次都会选择优先级最高的活着的 Sink 来消费 Channel 的 Events。每过一段时间就对失败队列中的 Sinks 进行检测,如果变活之后,就将其插进 活着的 Sink Map。

另一种 load_balance机制,在这种机制下,还有两种不同的策略,分别是 round_robin 和 random。则 round_robin 就是不断地轮询 Sinkgroups 内的 Sinks,已保证均衡。

random 则是从 Sinkgroups 中的 Sinks 随机选择一个。

该数据流模型如下图5所示。

                                          图5 Sinkgroups数据流模型

整个数据流如下:

Source          负责接收 Events,并将其发送到 Channel 中。

Channel        负责存储 Events

Sinkgroups   负责消费 Channel 中的 Events,并将 Events 发送到 HDFS 存储。

(6)单 Agent,多条数据流

如下图6所示,在单个 Agent 中,可以由多个 SourcesChannelsSinks 组成多条完全不相交的数据流。

                                          图6 

整个数据流如下:

Source1 Channel1 Sink1 HDFS1  组成数据流1

Source2 Channel2 Sink2 HDFS2 组成数据流2

数据流1和数据流2完全不相关。

(7)各种各样的数据流模型

从上面介绍的6种不同的数据流模型中,我们可以得知,模型1和模型2相当于程序设计中的顺序执行。

模型3中 Agent1Agent2Agent3 收集 Events 处于并行状态,向 Agent4 发送 Events 处于并发状态。

模型4 相当于程序设计中的 if--else,选择模型。

模型5中 三个 Sinks 也相当于处于并发状态。

模型6 相当于程序设计中的 并行模型。

则通过这6种不同的数据流模型,我们可以将它们进行不同的组合,形成各种各样的数据流模型,以应付我们的需求。

3. 解析数据流模型

不同的数据流模型,具有不同的功能,但是这些数据流模型是靠哪些组件,哪些策略来构成的。本节将分析不同的数据流模型在 Agent 内部是如何实现的。

(1)  Agent 内部组件架构

如下图6所示,这是 Agent 内部一个比较完整的架构图,它不仅仅包含了 SourceChannelSink,还包含了 SinkRunnerInterceptorChannelSelectorTransaction,

SinkRunnerSinkProcessorSinkSelector。下面我们将详细介绍每个部件在 Agent 内部所承担的责任。

                                                                图6 Agent 内部组件架构图

从途中可以看出,将整个数据流分成两阶段,分别是第一阶段:Source --> Channel, 第二阶段: Channel --> Sink。

下面就从这两个阶段来详细介绍各个组件在数据流过程中承担的责任。

(2) 第一阶段

图6既是数据流图,也是对象结构图。从图中可以看出,一个 SourceRunner 对象包含一个 Source 对象,一个 Source 对象包含一个 ChannelProcessor对象,

一个 ChannelProcessor 对象包含 多个 Interceptor 对象和一个 ChannelSelector 对象。

首先 SourceRunner 负责启动 Source, 则 Source 监控是否有 Events 发送过来,如果有,则接收 Events

其次 Events 被 ChannelProcessor 中的 Interceptor 进行过滤,Interceptor 的功能有三种,分别是 丢弃 Event,修改 Event 再返回,直接返回 Event(不做任何操作)。

举例:Interceptor 有两种比较好理解的,分别是 Timestamp Interceptor 和 host InterceptorTimestamp Interceptor 会对每个 Event 的添加属性时间戳, host Interceptor会为

每个 Event 添加属性 host 

然后ChannelSelector 的主要功能是完成上面的多路数据流模型,分别有两种,replicating 和 multiplexing。也就是说 ChannelSelector 为每个 Event 选择它所要发送到的 Channel

在 replicating 模式下,每个 Event 都被发送到 多个 Channel 中;在 multiplexing 模式下,不同的 Events 会被发送到不同的 Channel 中。

最后Source 与每个 Channel 通过 Transaction 建立连接,将 Events 发送到 Channel 中去。

(3)第二阶段

从图中可以看出,一个 SinkRunner 对象包含一个 SinkProcessor 对象,一个 SinkProcessor 对象包含多个 Sinks 和/或 一个 SinkSelector

首先 SinkRunner 启动一个 SinkProcessor 对象,SinkProcessor 有三种,分别是 DefaultSinkProcessorFailoverSinkProcessorLoadBalancingSinkProcessor

看到这里你是不是有点印象了,对,这就是我们上面提到的 Sinkgroups 数据流模型。如果单个 Sink 的话,则使用 DefaultSinkProcessor,它负责启动 Sink

如果多个 Sinks 组成一组的话,则可以设置 SinkProcessor 为 failover 或 loadBalance

其中 FailoverSinkProcessor 会将各个 Sink 设置优先级。保存了一个 SortedMap<Integer, Sink> liveSinks,活着的 Sinks,一个 Queue<FailedSink> failedSinks,保存死的 Sinks。

每次会从 liveSinks 中选择一个优先级最高的 Sink 来消费 Events。如果某个 Sink 挂掉,则将其放在 failedSinks里,并且每次都尝试 failedSinks中的第一个 Sink,如果它能变活,则将其

转到 liveSinks 中。其中 LoadBalancingSinkProcessor 里有一个对象 SinkSelector,该 SinkSelector 有两种,分别是 round_robin 和 random。这里你又有印象啦。则 SinkSelector 就是在 Sinkgroups 

选择某个 Sink 来消费 Events。 round_robin 是轮询 Sinkgroups 中的所有 Sinksrandom 是从 Sinkgroups 中随机选择 某个 Sink

其次SinkProcessor 会选择某个 Sink,启动 Sink

最后Sink 与 Channel 通过 Transaction 建立连接。消费 Channel 内的 Events