首页 > 代码库 > flume日志采集框架使用

flume日志采集框架使用

flume日志采集框架使用

  本次学习使用的全部过程均不在集群上,均在本机环境,供学习参考

 先决条件:

  • flume-ng-1.6.0-cdh5.8.3.tar  去cloudrea下载flume框架,笔者是用cdh5.8.3的套餐

  flume的使用环境:

  • 采集特定目录到hdfs环境以供分析离线数据
  • 监听特定端口的socket流数据

  本次将以上两种情况的使用加以记录供以后参考

  • 解压 flume-ng-1.6.0-cdh5.8.3.tar
  • mv flume-ng-1.6.0-cdh5.8.3 flume
  • 准备运行配置文件
    //socket流采集 netcat-logger.conf从网络端口接收数据,下沉到logger采集配置文件,netcat-logger.conf# example.conf: A single-node Flume configuration# Name the components on this agent#给那三个组件取个名字a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source#类型, 从网络端口接收数据,在本机启动, 所以localhost, type=spoolDir采集目录源,目录里有就采a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memory#下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:#capacity:默认该通道中最大的可以存储的event数量#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1启动命令:#告诉flum启动一个agent,指定配置参数, --name:agent的名字,$ bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console传入数据:$ telnet localhost 44444Trying 127.0.0.1...Connected to localhost.localdomain (127.0.0.1).Escape character is ‘^]‘.Hello world! <ENTER>OK

    技术分享技术分享

    //spooldir配置文件实例  spooldir-hdfs.conf

    监视文件夹启动命令:  bin/flume-ng agent -c ./conf -f ./conf/spooldir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console测试: 往/Users/willian/Public/flume放文件(mv ././xxxFile /Users/willian/Pulic/flume),但是不要在里面生成文件############### Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source#监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /Users/willian/Public/flumea1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

    技术分享

        可以看到 完成了采集会出现complete后缀

 

注意事项

  •      不能出现重名的文件,不然会报错

 

 

 

 

 

 

flume日志采集框架使用