首页 > 代码库 > kafka集群部署
kafka集群部署
版本:kafka_2.10-0.10.1.1 + zookeeper-3.4.9
说明:kafka自带了zookeeper,但是推荐使用独立的zookeeper集群。
前置环境:(参考Hadoop搭建)
1、节点配分配:192.168.235.138 node1192.168.235.139 node2192.168.235.140 node32、配置静态IP、hosts、hostname、关闭防火墙、安装JDK
1、zookeeper集群部署
(1)安装
上传文件到指定集群上,执行如下步骤
# 解压文件到/usr/local/目录下# tar -zxvf zookeeper-3.4.9.tar.gz -C /usr/local/# 删除无用文件(可选)# cd /usr/local/zookeeper-3.4.9/# find . -name "*.cmd" -exec rm -rf {} \;# rm -rf docs/# 创建数据、日志存放目录及当前节点ID# mkdir data # mkdir dataLog# echo "1" > data/myid
(2)配置
# pwd/usr/local/zookeeper-3.4.9# cp conf/zoo_sample.cfg conf/kafka_zk.cfg # vim conf/kafka_zk.cfg # 在kafka_zk.cfg文件中加入如下配置:tickTime=2000# 数据文件存放位置dataDir=/usr/local/zookeeper-3.4.9/datadataLogDir=/usr/local/zookeeper-3.4.9/dataLog#服务监听端口clientPort=2181#选举等待时间initLimit=5syncLimit=2#集群节点信息server.1=node1:2888:3888server.2=node2:2888:3888server.3=node3:2888:3888
(3)分发文件、修改对应节点的ID
scp -r zookeeper-3.4.9 root@node2:/usr/local/scp -r zookeeper-3.4.9 root@node3:/usr/local/
修改node2和node3节点上/usr/local/zookeeper-3.4.9/data目录下的myid文件内容分别为2、3
(4)查看各节点时间并同步时钟
# date #系统时间,系统重启失效2017年 04月 14日 星期五 11:28:08 CST# 手动更改系统时间# date -s ‘2017/04/14 11:33:50‘# clock -r #读取CMOS时间2017年04月14日 星期五 19时34分33秒 -1.038372 seconds# clock -w #将当前时间写入CMOS
注:集群各节点之间的时间同步很重要,必要时部署本地集群的时钟服务器
(5)启动集群
# /usr/local/zookeeper-3.4.9/bin/zkServer.sh start /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #启动(指定配置文件)# jps #查看进程15286 QuorumPeerMain15319 Jps# /usr/local/zookeeper-3.4.9/bin/zkServer.sh status /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #查看状态# /usr/local/zookeeper-3.4.9/bin/zkServer.sh stop /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #关闭
注意:
i 最好在启动、关闭时都指定配置文件,一方面,集群中可能有多个配置文件,另一方面,暴力kill进程可能会带来意想不到的错误。
ii jps只能查看进程是否存在,在某些情况下进程存在不代表zk集群处于可用状态,可通过查看zk状态或者用自带的客户端连接确认是
否处于可用状态。连接示例如下:
# /usr/local/zookeeper-3.4.9/bin/zkCli.sh -server node1:2181日志。。。。[zk: node1:2181(CONNECTED) 0] ls /[zookeeper][zk: node1:2181(CONNECTED) 1] help #命令参考ZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] set path data [version] rmr path delquota [-n|-b] path quit printwatches on|off create [-s] [-e] path data acl stat path [watch] close ls2 path [watch] history listquota path setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] setquota -n|-b val path退出:quit 或 Ctrl + C/D
(6)zk自带工具介绍
zookeeper自带的工具如下:
# ls bin/ zkCleanup.sh zkCli.sh zkEnv.sh zkServer.sh# zkCleanup.sh :zookeeper不会自己删除旧的快照和日志文件,需要人工清除,可以自己写脚本清除或者用zk自带的zkCleanup工具。# zkCli.sh :zk客户端# zkEnv.sh : 主要配置,zookeeper集群启动时配置环境变量的文件# zkServer.sh 管理程序文件
日志管理是后期维护的重要一环,要特别注意。至此,zk集群搭建完毕。
2、kafka集群
(1)安装
上传文件到集群节点
tar -zxvf kafka_2.10-0.10.1.1.tgz -C /usr/local/cd /usr/local/mv kafka_2.10-0.10.1.1 kafka
(2)配置
# cd /usr/local/kafka/# mkdir logs #预设日志存放目录# ll config/ | awk ‘{print $9}‘ #配置文件列表connect-console-sink.propertiesconnect-console-source.propertiesconnect-distributed.propertiesconnect-file-sink.propertiesconnect-file-source.propertiesconnect-log4j.propertiesconnect-standalone.propertiesconsumer.properties #消费者配置选项log4j.propertiesproducer.properties #生产者配置选项server.properties #主要配置文件tools-log4j.propertieszookeeper.properties# vim config/server.properties 修改如下参数,没有则添加broker.id=0 host.name=192.168.235.138 log.dirs=/usr/local/kafka/logs #在log.retention.hours=168 下面新增下面三项message.max.byte=5242880default.replication.factor=2replica.fetch.max.bytes=5242880zookeeper.connect=node1:2181,node2:2181,node3:2181
补充:server.properties可选参数如下
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样port=19092 #当前kafka对外提供服务的端口默认是9092host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。num.network.threads=3 #这个是borker进行网络处理的线程数num.io.threads=8 #这个是borker进行I/O处理的线程数log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小num.partitions=1 #默认的分区数,一个topic默认1个分区数log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天message.max.byte=5242880 #消息保存的最大值5Mdefault.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务replica.fetch.max.bytes=5242880 #取消息的最大直接数log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
注意:每个节点对应一个broker id,并和IP一一对应
(3)启动服务
# cd kafka/# bin/kafka-server-start.sh config/server.properties # jps
(4)测试
创建一个主题,并用node1作为消息生产者(发布者),node2、node3作为消息消费者(订阅者)。
在node1节点上:
#启动服务# bin/kafka-server-start.sh -daemon config/server.properties # jps# 创建一个主题test:一个分区,两个副本# bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partition 1 --topic test # 创建一个生产者(消息发布者)# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test等待输入要发布的消息。。。
在node2、node3节点上:
# bin/kafka-server-start.sh -daemon config/server.properties
# 创建一个消费者(消息订阅者)# bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic test --from beginning
等待接收消息。。。
在node1的发布频道里输入消息,可在node2、node3上接收到对应的消息。
其他一些常用命令参考:
查看主题bin/kafka-topics.sh --list --zookeeper localhost:2181查看主题详情bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test删除主题bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test生产者参数查看:bin/kafka-console-producer.sh消费者参数查看:bin/kafka-console-consumer.sh
(5)补充:
日志说明:
server.log #kafka的运行日志state-change.log #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
zookeeper查看:
# bin/zkCli.sh ...[zk: localhost:2181(CONNECTED) 0] ls /[isr_change_notification, zookeeper, admin, consumers, cluster, config, controller, brokers, controller_epoch]每个节点上都可以通过zookeeper查看kafka的节点注册信息
官方文档:http://kafka.apache.org/documentation.html
参考:http://www.cnblogs.com/luotianshuai/p/5206662.html
kafka集群部署