首页 > 代码库 > Erlang 编写 Kafka 客户端之最简单入门
Erlang 编写 Kafka 客户端之最简单入门
Erlang 编写 Kafka 客户端之最简单入门
费劲周折,终于测通了 erlang 向kafka 发送消息,使用了ekaf 库,参考:
An advanced but simple to use, Kafka producer written in Erlanghttps://github.com/helpshift/ekaf
1 准备kafka客户端
准备2台机器,一台是ekaf运行的kafka客户端(192.168.191.2),一台是kafka服务端(zookeeper+kafka)(192.168.122.199)。我在开发机器 (192.168.191.2)上,安装ekaf。虽然ekaf的机器上不需要安装 kafka ,但是为了测试 kafka 命令行客户端是否可用,我还是安装了 kafka:
$ echo $KAFKA_HOME
/usr/local/apache/kafka_2.9.2-0.8.1.1
打开一个kafka客户端producer:
$ kafka-console-producer.sh --broker-list 192.168.122.199:9092 --sync --topic ekaf
2 准备kafka 服务端
kafka 服务端(broker)的安装参考我的文章,并且已经建立了 ekaf 这个topic:
zookeeper+kafka集群安装之一
zookeeper+kafka集群安装之二
kafka broker安装位置在:192.168.122.199:9092,配置文件 server.properties 部分内容如下:
broker.id=0 port=9092 host.name=192.168.122.199 advertised.host.name=192.168.122.199 advertised.port=9092 num.partitions=2 zookeeper.connect=192.168.122.199:2181,192.168.122.199:2182,192.168.122.199:2183
/etc/profile 配置包含:
export APACHE_HOME=/usr/local/apacheexport KAFKA_HOME=$APACHE_HOME/kafka_2.9.2-0.8.1.1export ZK1_HOME=$APACHE_HOME/zk-cluster/zk1/zookeeper-3.4.6 export ZK2_HOME=$APACHE_HOME/zk-cluster/zk2/zookeeper-3.4.6 export ZK3_HOME=$APACHE_HOME/zk-cluster/zk3/zookeeper-3.4.6
启动 zookeeper:
$ZK1_HOME/bin/zkServer.sh start $ZK2_HOME/bin/zkServer.sh start $ZK3_HOME/bin/zkServer.sh start
启动zookeeper后,稍微等一会(30秒)再启动kafka,否则出错:
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
启动kafka broker之后,启动一个consumer,监听消息队列:
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.122.199:2181 --topic ekaf --from-beginning
此时在kafka客户端producer窗口里输入消息,er窗口中显示出来,证明消息队列已经准备好了。
3 kafka客户端开发
下面的操作都在kafka客户端(192.168.191.2)上进行。
1) 获取 ekaf
$ git clone https://github.com/helpshift/ekaf.git $ cd ekaf
2) 编译 ekaf
$ rebar get-deps clean compile
如何安装和使用 rebar 请参考我的其他文章:
Erlang Rebar 使用指南之一:入门篇
3) 运行 ekaf
$ erl -pa ‘./deps/gproc/ebin‘ -pa ‘./deps/kafkamocker/ebin‘ -pa ‘./ebin‘ 1> application:load(gproc). ok 2> application:load(kafkamocker). ok 3> application:load(ekaf). ok 4> application:set_env(ekaf, ekaf_bootstrap_broker, {"192.168.122.199", 9092}). ok 5> application:start(gproc). ok 6> application:start(kafkamocker). ok 7> application:start(ekaf). ok 8> ekaf:produce_sync(<<"ekaf">>, <<"Hello Kafka">>). {{sent,0,<0.67.0>}, {produce_response,1,0, [{topic,<<"ekaf">>,0,[{partition,0,0,0,[],[],0,[]}]}]}} 9> ekaf:produce_sync(<<"ekaf">>, <<"Hello Kafka 99">>). {{sent,1,<0.196.0>}, {produce_response,1,0, [{topic,<<"ekaf">>,0,[{partition,0,0,0,[],[],0,[]}]}]}} 10> q(). ok
注意,application:set_env(ekaf, ekaf_bootstrap_broker, {"192.168.122.199", 9092}). 的配置要确保与kafka-console-producer.sh使用的一致。我就是一个使用了IP,一个使用了hostname,导致一直无法成功,浪费了几天时间。
Erlang 编写 Kafka 客户端之最简单入门
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。