首页 > 代码库 > Kafka 0.8 配置参数解析

Kafka 0.8 配置参数解析

http://kafka.apache.org/08/configuration.html

 

Broker Configs

4个必填参数,

broker.id
Each broker is uniquely identified by a non-negative integer id
broker唯一标识,broker可以在不同的host或port,但必须保证id唯一

log.dirs (/tmp/kafka-logs)
日志文件存放的目录
可以用逗号隔开多个目录,当创建partitions时,会自动挑一个已创建partition最少的目录创建
因为Kafka必须充分利用磁盘资源,所以要让partititons均匀分布在所有disks上,至少每个disk创建一个目录

port (6667)
broker server所在端口

zookeeper.connect
zk的地址,hostname1:port1,hostname2:port2

 

可选的重要参数,

message.max.bytes (1000000)
broker可以接收的message的最大size
注意要和consumer的maximum fetch size相匹配

num.io.threads (8)
处理I/O的线程数,即读写磁盘的线程数,所以至少等于磁盘数

queued.max.requests (500)
可以queue的最大请求数,包括producer的写请求和consumer的读请求

log.segment.bytes (1024 * 1024 * 1024), log.segment.bytes.per.topic (topic1:value1,topic2:value2)
log.roll.hours, log.roll.hours.per.topic
上面4个配置用于设置,何时为partition产生新的segment文件
为了防止partition文件过大,所以partition是由一组segment文件组成
可以通过设置size或时间来决定何时roll
可以设置全局值或为每个topic设置不同的值

log.retention.hours (24 * 7), log.retention.hours.per.topic
log.retention.bytes (-1), log.retention.bytes.per.topic
log.cleanup.interval.mins (10), 多久check
上面的配置用于设置,保留多久的数据在磁盘
默认是7天数据
当然你可以通过partition大小来设置删除threshold,默认是-1,即关掉的,注意这里设置的是partition大小

log.flush.interval.messages (10000), log.flush.interval.ms.per.topic
log.flush.interval.ms
log.flush.scheduler.interval.ms,多久check
为了效率,broker会缓存部分log,然后再flush到磁盘
这里可以通过message数或时间来控制flush策略

还有大量和replica相关的参数,暂时不考虑

 

Producer Configs

参考,Kafka Producer接口

 

Consumer Configs

首先只有在用high level consumer时才需要管这个配置
如果用low level,自己做代码里面写,这个配置不起作用的

2个必填参数,

group.id
A string that uniquely identifies the group of consumer processes to which this consumer belongs.

zookeeper.connect
zk地址,hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
可以看到zk地址其实是可以设置到,目录级别的(比如用于一个zk管理多个kafka集群),但是如果不用默认目录,需要自己创建和维护这个zk目录
并且必须要和broker中的zk配置保持一致

 

可选配置,

fetch.message.max.bytes (1024 * 1024)
The number of byes of messages to attempt to fetch for each topic-partition in each fetch request
为了效率,consumer从kafka fetch数据,也是批量fetch的,虽然你处理的时候是onebyone逻辑,但后台是预读的
这个值至少要等于broker里面设置的maximum message size,否则有可能连一条message都取不下来
默认是1m,设太大会爆内存,而且会增大读重的可能性,因为当consumer发生变化时,会发生rebalance,这时被新分配到这个partition的consumer仍然会读到预读但没有commit的数据

auto.commit.enable (true), auto.commit.interval.ms (60 * 1000)
默认是会自动commit offset到zk的,如果要自己commit,设为false
默认除非自动commit的时间是1分钟
用手工commit的简单的理由是,防止consumer crash丢失数据,可以在确认数据被正确处理后,再手工commit offset

queued.max.message.chunks (10)
上面说的fetch.message.max.bytes是针对一个partition的,但是一个consumer是可以同时读多个partition,所以对每个partition都可以读一个这样的chunk
所以这个配置是,同时可以读多少大小为fetch.message.max.bytes的chunk
默认是10,个人认为至少要大于读的partition个数吧,但避免耗尽内存

fetch.wait.max.ms (100), fetch.min.bytes (1)
当取不到fetch.message.max.bytes时,最大block时间,并且返回fetch.min.bytes大小数据

auto.offset.reset
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer.

consumer.timeout.ms (-1)
Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
默认是block的,当consumer取不到数据

Kafka 0.8 配置参数解析