首页 > 代码库 > Flume一个数据源对应多个channel,多个sink
Flume一个数据源对应多个channel,多个sink
一、概述
1、现在有三台机器,分别是:Hadoop1,Hadoop2,Hadoop3,以Hadoop1为日志汇总
2、Hadoop1汇总的同时往多个目标进行输出
3、Flume一个数据源对应多个channel,多个sink,是在consolidation-accepter.conf文件里配置的
二、部署Flume来采集日志和汇总日志
1、在Hadoop1上运行
flume-ng agent --conf ./ -f consolidation-accepter.conf -n agent1 -Dflume.root.logger=INFO,console
其脚本(consolidation-accepter.conf)内容如下
# Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 ch2 agent1.sources = source1 agent1.sinks = hdfssink1 sink2 agent1.source.source1.selector.type = replicating agent1.source.source1.selector.optional = ch1 # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 1000000 agent1.channels.ch1.transactionCapacity = 1000000 agent1.channels.ch1.keep-alive = 10 agent1.channels.ch2.type = memory agent1.channels.ch2.capacity = 1000000 agent1.channels.ch2.transactionCapacity = 100000 agent1.channels.ch2.keep-alive = 10 # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.source1.channels = ch1 ch2 agent1.sources.source1.type = avro agent1.sources.source1.bind = con agent1.sources.source1.port = 44444 agent1.sources.source1.threads = 5 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.hdfssink1.channel = ch1 agent1.sinks.hdfssink1.type = hdfs agent1.sinks.hdfssink1.hdfs.path = hdfs://mycluster/flume/%Y-%m-%d/%H%M agent1.sinks.hdfssink1.hdfs.filePrefix = S1PA124-consolidation-accesslog-%H-%M-%S agent1.sinks.hdfssink1.hdfs.useLocalTimeStamp = true agent1.sinks.hdfssink1.hdfs.writeFormat = Text agent1.sinks.hdfssink1.hdfs.fileType = DataStream agent1.sinks.hdfssink1.hdfs.rollInterval = 1800 agent1.sinks.hdfssink1.hdfs.rollSize = 5073741824 agent1.sinks.hdfssink1.hdfs.batchSize = 10000 agent1.sinks.hdfssink1.hdfs.rollCount = 0 agent1.sinks.hdfssink1.hdfs.round = true agent1.sinks.hdfssink1.hdfs.roundValue = http://www.mamicode.com/60>2、分别在Hadoop2和Hadoop3运行如下命令flume-ng agent --conf ./ --conf-file collect-send.conf --name agent1Flume数据发送器配置文件collect-send.conf内容如下
agent2.sources = source2 agent2.sinks = sink1 agent2.channels = ch2 agent2.sources.source2.type = exec agent2.sources.source2.command = tail -F /root/data/flume.log agent2.sources.source2.channels = ch2 #channels configuration agent2.channels.ch2.type = memory agent2.channels.ch2.capacity = 10000 agent2.channels.ch2.transactionCapacity = 10000 agent2.channels.ch2.keep-alive = 3 #sinks configuration agent2.sinks.sink1.type = avro agent2.sinks.sink1.hostname=consolidationIpAddress agent2.sinks.sink1.port = 44444 agent2.sinks.sink1.channel = ch2三、总结
1、启动Flume汇总进程 flume-ng agent --conf ./ -f consolidation-accepter.conf -n agent1 -Dflume.root.logger=INFO,console 2、启动Flume采集进程 flume-ng agent --conf ./ --conf-file collect-send.conf --name agent1 3、配置参数说明(以下两个条件是or的关系,也就是当一个条件满足就触发)(1)每半小时把channel里的数据冲刷到sink中去,并且另起新的文件来存储 agent1.sinks.hdfssink1.hdfs.rollInterval = 1800 (2)当文件大小为5073741824字节时,另起新的文件来存储 agent1.sinks.hdfssink1.hdfs.rollSize = 5073741824Flume一个数据源对应多个channel,多个sink
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。