首页 > 代码库 > Erlang 编写 Kafka 客户端之最简单入门

Erlang 编写 Kafka 客户端之最简单入门

Erlang 编写 Kafka 客户端之最简单入门

费劲周折,终于测通了 erlang 向kafka 发送消息,使用了ekaf 库,参考:

An advanced but simple to use, Kafka producer written in Erlang

https://github.com/helpshift/ekaf

1 准备kafka客户端

准备2台机器,一台是ekaf运行的kafka客户端(192.168.191.2),一台是kafka服务端(zookeeper+kafka)(192.168.122.199)。我在开发机器 (192.168.191.2)上,安装ekaf。虽然ekaf的机器上不需要安装 kafka ,但是为了测试 kafka 命令行客户端是否可用,我还是安装了 kafka:

$ echo $KAFKA_HOME

/usr/local/apache/kafka_2.9.2-0.8.1.1

打开一个kafka客户端producer:

$ kafka-console-producer.sh --broker-list 192.168.122.199:9092 --sync --topic ekaf

2 准备kafka 服务端

kafka 服务端(broker)的安装参考我的文章,并且已经建立了 ekaf 这个topic:

zookeeper+kafka集群安装之一

zookeeper+kafka集群安装之二

kafka broker安装位置在:192.168.122.199:9092,配置文件 server.properties 部分内容如下:

broker.id=0
port=9092
host.name=192.168.122.199
advertised.host.name=192.168.122.199
advertised.port=9092
num.partitions=2
zookeeper.connect=192.168.122.199:2181,192.168.122.199:2182,192.168.122.199:2183

/etc/profile 配置包含:

export APACHE_HOME=/usr/local/apacheexport KAFKA_HOME=$APACHE_HOME/kafka_2.9.2-0.8.1.1export ZK1_HOME=$APACHE_HOME/zk-cluster/zk1/zookeeper-3.4.6
export ZK2_HOME=$APACHE_HOME/zk-cluster/zk2/zookeeper-3.4.6
export ZK3_HOME=$APACHE_HOME/zk-cluster/zk3/zookeeper-3.4.6

启动 zookeeper:

$ZK1_HOME/bin/zkServer.sh start
$ZK2_HOME/bin/zkServer.sh start
$ZK3_HOME/bin/zkServer.sh start

启动zookeeper后,稍微等一会(30秒)再启动kafka,否则出错:

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &

启动kafka broker之后,启动一个consumer,监听消息队列:

$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.122.199:2181 --topic ekaf --from-beginning

此时在kafka客户端producer窗口里输入消息,er窗口中显示出来,证明消息队列已经准备好了。

3 kafka客户端开发

下面的操作都在kafka客户端(192.168.191.2)上进行。

1) 获取 ekaf

$ git clone https://github.com/helpshift/ekaf.git
$ cd ekaf

2) 编译 ekaf

$ rebar get-deps clean compile

如何安装和使用 rebar 请参考我的其他文章:

Erlang Rebar 使用指南之一:入门篇

3) 运行 ekaf

$ erl -pa ‘./deps/gproc/ebin‘ -pa ‘./deps/kafkamocker/ebin‘ -pa ‘./ebin‘
1> application:load(gproc).
ok
2> application:load(kafkamocker).
ok
3> application:load(ekaf).
ok
4> application:set_env(ekaf, ekaf_bootstrap_broker, {"192.168.122.199", 9092}).
ok
5> application:start(gproc).
ok
6> application:start(kafkamocker).
ok
7> application:start(ekaf).
ok
8> ekaf:produce_sync(<<"ekaf">>, <<"Hello Kafka">>).
{{sent,0,<0.67.0>},
 {produce_response,1,0,
                   [{topic,<<"ekaf">>,0,[{partition,0,0,0,[],[],0,[]}]}]}}
9> ekaf:produce_sync(<<"ekaf">>, <<"Hello Kafka 99">>).
{{sent,1,<0.196.0>},
 {produce_response,1,0,
                   [{topic,<<"ekaf">>,0,[{partition,0,0,0,[],[],0,[]}]}]}}
10> q().
ok

注意,application:set_env(ekaf, ekaf_bootstrap_broker, {"192.168.122.199", 9092}). 的配置要确保与kafka-console-producer.sh使用的一致。我就是一个使用了IP,一个使用了hostname,导致一直无法成功,浪费了几天时间。


Erlang 编写 Kafka 客户端之最简单入门