首页 > 代码库 > kafka语句示例
kafka语句示例
1.从http://kafka.apache.org/下载kafka安装包;
2.tar zxvf kafka_2.8.0.tar.gz,修改配置文件conf/server.properties:
broker.id=0;host.name=10.100.5.9;zookeeper.connect=10.100.5.9:2181可逗号分隔配置多个
3.启动服务:bin/zkServer.sh start; bin/kafka-server-start.sh config/server.properties &
4.创建topic主题:
bin/kafka-create-topic.sh --zookeeper 10.100.5.9:2181 --replica 3 --partition 1 --topic mykafka
//启动报错Unrecognized VM option ‘+UseCompressedOops‘
查看 bin/kafka-run-class.sh
找到
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
去掉-XX:+UseCompressedOops
5.测试主题:
bin/kafka-list-topic.sh --zookeeper 10.100.5.9:2181
6.生产者发送消息:
bin/kafka-console-producer.sh --broker-list 10.100.5.9:9092 --topic mykafka
7.消费者接收消息:
bin/kafka-console-consumer.sh --zookeeper 10.100.5.9:2181 --topic mykafka --from-beginning
8.多broker测试
之前是一台机器上当做一个node,现在尝试在一台机器上放3个node,即broker
修改配置文件
# cp config/server.properties config/server-1.properties
# cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
启动broker1和broker2
# JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &
# JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
创建一个topic
# bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 3 --partition 1 --topic my-replicated-topiclist一下
# bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: my-replicated-topic partition: 0 leader: 2 replicas: 2,0,1 isr: 2,0,1
topic: test partition: 0 leader: 0 replicas: 0 isr: 0partition:同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性
leader:负责此partition的读写操作,每个broker都有可能成为某partition的leader
replicas:副本,即此partition在那几个broker上有备份,不管broker是否存活
isr:存活的replicas
kafka语句示例