首页 > 代码库 > Flume+Kafka整合
Flume+Kafka整合
Flume+Kafka整合
一、准备工作
准备5台内网服务器创建Zookeeper和Kafka集群
服务器地址:
192.168.2.240
192.168.2.241
192.168.2.242
192.168.2.243
192.168.2.244
服务器系统:Centos 6.5 64位
下载安装包
Zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
Flume:http://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Kafka:http://apache.fayea.com/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
Zookeeper,Flume,kafka需要用到Java环境,所以先安装JDk
yum install java-1.7.0-openjdk-devel
二、安装配置zookeeper
选择3台服务器作为zookeeper集群,他们的IP分别为:
192.168.2.240
192.168.2.241
192.168.2.242
注:先在第一台服务器192.168.2.240上分别执行(1)-(3)步。
(1)解压:将zookeeper-3.4.6.tar.gz放入/opt目录下
tar zxf zookeeper-3.4.6.tar.gz
(2)创建配置文件:将conf/zoo_sample.cfg拷贝一份命名为zoo.cfg,也放在conf目录下。然后按照如下值修改其中的配置:
tickTime=2000
dataDir=/opt/zookeeper/Data
initLimit=5
syncLimit=2
clientPort=2181
server.1=192.168.2.240:2888:3888
server.2=192.168.2.241:2888:3888
server.3=192.168.2.242:2888:3888
各个参数的意义:
tickTime:心跳检测的时间间隔(毫秒),缺省:2000
clientPort:其他应用(比如solr)访问ZooKeeper的端口,缺省:2181
initLimit:初次同步的阶段(followers连接到leader的阶段),允许的时长(tick数量),缺省:10
syncLimit:允许followers同步到ZooKeeper的时长(tick数量),缺省:5
dataDir:数据(比如所管理的配置文件)的存放路径
server.X:X是集群中一个服务器的id,与myid文件中的id是一致的。右边可以配置两个端口,第一个端口用于Fllower和Leader之间的数据同步和其它通信,第二个端口用于Leader选举过程中投票通信。
(3)创建/opt/zookeeper/Data快照目录,并创建my id文件,里面写入1。
mkdir /opt/zookeeper/Data vi /opt/zookeeper/Data/myid 1
(4)将192.168.2.240上已经配置好的/opt/zookeeper/目录分别拷贝至192.168.2.241和192.168.2.242。然后将对应的myid的内容修改为2和3
(5)启动zookeeper集群
分别在3台服务器上执行启动命令
/opt/zookeeper/bin/zkServer.sh start
三、安装配置kafka集群
一共5台服务器,服务器IP地址:
192.168.2.240 node1
192.168.2.241 node2
192.168.2.242 node3
192.168.2.243 node4
192.168.2.244 node5
1、解压安装文件到/opt/目录
cd /opt tar -zxvf kafka_2.10-0.10.0.0.tar.gz mv kafka_2.10-0.10.0.0 kafka
2、修改server. properties文件
#node1 配置
broker.id=0
port=9092
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9092
advertised.host.name=58.246.xx.xx
#碰到的坑,由于我是从线上把nginx日志拉回公司本地服务器,所以这两选项必须配置成路由器外网IP地址,否则线上flume报无法连接kafka节点,报无法传送日志消息
advertised.port=9092
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node2 配置
broker.id=1
port=9093
advertised.listeners=PLAINTEXT://58.246.xx.xx:9093
advertised.host.name=58.246.xx.xx
advertised.port=9093
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node3 配置
broker.id=2
port=9094
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9094
advertised.host.name=58.246.xx.xx
advertised.port=9094
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node4 配置
broker.id=2
port=9095
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9095
advertised.host.name=58.246.xx.xx
advertised.port=9095
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node5 配置
broker.id=2
port=9096
advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9096
advertised.host.name=58.246.xx.xx
advertised.port=9096
num.network.threads=3
num.io.threads=8
num.partitions=5
zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
启动卡夫卡集群
分别在所有节点执行以下命令来启动服务
/opt/kafka/bin/kafka-server-start.sh/opt/kafka/config/server.properties &
四、安装配置Flume
安装两台flume,一台安装在线上,把线上的日志传回本地kafka,另一台安装在本地,把kafka集群的日志信息转存到HDFS
4.1、线上服务器安装Flume
收集nginx日志传给公司内部kafka
1、 解压安装包
cd /opt
tar –zxvf apache-flume-1.7.0-bin.tar.gz
2、 创建配置文件
Vi flume-conf.properties 添加以下内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F/unilifeData/logs/nginx/access.log
a1.sources.r1.channels = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000
#sinks
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = unilife_nginx_production
a1.sinks.k1.kafka.bootstrap.servers = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094
a1.sinks.k1.brokerList = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 2000
a1.sinks.k1.channel = c1
启动flume服务
/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-conf.properties --name a1-Dflume.root.logger=INFO,LOGFILE &
4.2、本地安装flume
转存日志到HDFS
1、解压安装包
cd /opt
tar –zxvf apache-flume-1.7.0-bin.tar.gz
3、 创建配置文件
nginx.sources = source1
nginx.channels = channel1
nginx.sinks = sink1
nginx.sources.source1.type =org.apache.flume.source.kafka.KafkaSource
nginx.sources.source1.zookeeperConnect =master:2181,slave1:2181,slave2:2181
nginx.sources.source1.topic =unilife_nginx_production
nginx.sources.source1.groupId =flume_unilife_nginx_production
nginx.sources.source1.channels = channel1
nginx.sources.source1.interceptors = i1
nginx.sources.source1.interceptors.i1.type =timestamp
nginx.sources.source1.kafka.consumer.timeout.ms = 100
nginx.channels.channel1.type = memory
nginx.channels.channel1.capacity = 10000000
nginx.channels.channel1.transactionCapacity = 1000
nginx.sinks.sink1.type = hdfs
nginx.sinks.sink1.hdfs.path =hdfs://192.168.2.240:8020/user/hive/warehouse/nginx_log
nginx.sinks.sink1.hdfs.writeFormat=Text
nginx.sinks.sink1.hdfs.inUsePrefix=_
nginx.sinks.sink1.hdfs.rollInterval = 3600
nginx.sinks.sink1.hdfs.rollSize = 0
nginx.sinks.sink1.hdfs.rollCount = 0
nginx.sinks.sink1.hdfs.fileType = DataStream
nginx.sinks.sink1.hdfs.minBlockReplicas=1
nginx.sinks.sink1.channel = channel1
启动服务
/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-nginx-log.properties --name nginx-Dflume.root.logger=INFO,LOGFILE &
Flume+Kafka整合