首页 > 代码库 > Flume-ng-mongodb-sink

Flume-ng-mongodb-sink

  本文主要介绍使用Flume传输数据到MongoDB的过程,内容涉及环境部署和注意事项。

一、环境搭建

1、flune-ng下载地址:http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
2、mongodb java driver jar包下载地址:https://oss.sonatype.org/content/repositories/releases/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar
3、flume-ng-mongodb-sink 源码下载地址:https://github.com/leonlee/flume-ng-mongodb-sink
     flume-ng-mongodb-sink 需要自己编译jar包,从github上下载代码,解压之后执行mvn package,即可生成。需要先安装maven用于编译jar包

二、Flume配置

1、env配置

  将mongo-java-driver和flume-ng-mongodb-sink两个jar包放到flume\lib目录下,并将路径加入到flume-env.sh文件的FLUME_CLASSPATH变量中;
  JAVA_OPTS变量: 加上-Dflume.monitoring.type=http -Dflume.monitoring.port=xxxx,可以在[hostname:xxxx]/metrics 上看到监控信息; -Xms指定JVM初始内存,-Xmx指定JVM最大内存
  FLUME_HOME变量: 设定FLUME根目录
  JAVA_HOME变量: 设定JAVA根目录

2、 log配置

在调试时,将日志设置为debug并打到文件:flume.root.logger=DEBUG,LOGFILE

3、 传输配置

采用 Exec Source、file-channel、flume-ng-mongodb-sink

my_agent.sources.my_source_1.channels = my_channel_1my_agent.sources.my_source_1.type = execmy_agent.sources.my_source_1.command = python  xxx.pymy_agent.sources.my_source_1.shell = /bin/bash -cmy_agent.sources.my_source_1.restartThrottle = 10000my_agent.sources.my_source_1.restart = truemy_agent.sources.my_source_1.logStdErr = truemy_agent.sources.my_source_1.batchSize = 1000my_agent.sources.my_source_1.interceptors = i1 i2 i3my_agent.sources.my_source_1.interceptors.i1.type = staticmy_agent.sources.my_source_1.interceptors.i1.key = dbmy_agent.sources.my_source_1.interceptors.i1.value = http://www.mamicode.com/cswuyg_test>

字段说明:采用exec source,指定执行命令行为python xxx.py,在xxx.py代码中处理日志,并按照跟flume-ng-mongodb-sink的约定,print出json格式的数据,如果update类操作必须带着_id字段,print出来的日志被当作Event的Body,我再通过interceptors给它加上自定义Event Header;

static interceptors用于为Event Header添加信息,这里我为它加上了db=cswuyg_test、collection=cswuyg_test、op=upsert,这三个key是跟flume-ng-mongodb-sink 约定的,用于指定mongodb中的db、collection名以及操作类型为update。

my_agent.channels.my_channel_1.type = filemy_agent.channels.my_channel_1.checkpointDir = /home/work/flume/file-channel/my_channel_1/checkPointmy_agent.channels.my_channel_1.useDualCheckpoints = truemy_agent.channels.my_channel_1.backupCheckpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint2my_agent.channels.my_channel_1.dataDirs = /home/work/flume/file-channel/my_channel_1/datamy_agent.channels.my_channel_1.transactionCapacity = 10000my_agent.channels.my_channel_1.checkpointInterval = 30000my_agent.channels.my_channel_1.maxFileSize = 4292870142my_agent.channels.my_channel_1.minimumRequiredSpace = 524288000my_agent.channels.my_channel_1.capacity = 100000

sink配置:

my_agent.sinks.my_mongo_1.type = org.riderzen.flume.sink.MongoSinkmy_agent.sinks.my_mongo_1.host = xxxhostmy_agent.sinks.my_mongo_1.port = yyyportmy_agent.sinks.my_mongo_1.model = DYNAMIC/SINGLE ---查看源码仅支持此二种方式,并且必须大小
my_agent.sinks.my_mongo_1.db = XXX --mongo表名,默认名称为events
my_agent.sinks.my_mongo_1.username = XXX --mongo用户名
my_agent.sinks.my_mongo_1.password = YYY --mongo密码
my_agent.sinks.my_mongo_1.collecion = logmy_agent.sinks.my_mongo_1.batch = 10my_agent.sinks.my_mongo_1.channel = my_channel_1my_agent.sinks.my_mongo_1.timestampField = _S

参见:http://www.cnblogs.com/cswuyg/p/4498804.html

Flume-ng-mongodb-sink