首页 > 代码库 > 4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程
4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程
本博文的主要内容有
kafka的单机模式部署
kafka的分布式模式部署
生产者java客户端编程
消费者java客户端编程
运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper。
单机模式,用的是kafka自带的zookeeper,
分布式模式,用的是外部安装的zookeeper,即公共的zookeeper。
Step 6: Setting up a multi-broker cluster
So far we have been running against a single broker, but that‘s no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let‘s expand our cluster to three nodes (still all on our local machine).
First we make a config file for each of the brokers:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
Now edit these new files and set the following properties:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
The broker.id
property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each others data.
We already have Zookeeper and our single node started, so we just need to start the two new nodes:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
Now create a new topic with a replication factor of three:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Okay but now that we have a cluster how can we know which broker is doing what? To see that run the "describe topics" command:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.
- "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
- "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
- "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Note that in my example node 1 is the leader for the only partition of the topic.
We can run the same command on the original topic we created to see where it is:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.
Let‘s publish a few messages to our new topic:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let‘s consume these messages:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let‘s test out fault-tolerance. Broker 1 was acting as the leader so let‘s kill it:
> ps | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
But the messages are still be available for consumption even though the leader that took the writes originally is down:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
其实啊,一般都是kafka集群,在业务里,也没什么,改下kafka_2.10-0.8.1.1/config/下的配置文件server.properties,就好。
[hadoop@weekend110 config]$ pwd
/home/hadoop/app/kafka_2.10-0.8.1.1/config
[hadoop@weekend110 config]$ ll
total 32
-rw-rw-r--. 1 hadoop hadoop 1202 Apr 23 2014 consumer.properties
-rw-rw-r--. 1 hadoop hadoop 3828 Apr 23 2014 log4j.properties
-rw-rw-r--. 1 hadoop hadoop 2217 Apr 23 2014 producer.properties
-rw-rw-r--. 1 hadoop hadoop 5331 Oct 13 22:08 server.properties
-rw-rw-r--. 1 hadoop hadoop 3326 Apr 23 2014 test-log4j.properties
-rw-rw-r--. 1 hadoop hadoop 995 Apr 23 2014 tools-log4j.properties
-rw-rw-r--. 1 hadoop hadoop 1025 Oct 13 22:06 zookeeper.properties
[hadoop@weekend110 config]$ cp server.properties server-1.properties
[hadoop@weekend110 config]$ cp server.properties server-2.properties
[hadoop@weekend110 config]$ ll
total 48
-rw-rw-r--. 1 hadoop hadoop 1202 Apr 23 2014 consumer.properties
-rw-rw-r--. 1 hadoop hadoop 3828 Apr 23 2014 log4j.properties
-rw-rw-r--. 1 hadoop hadoop 2217 Apr 23 2014 producer.properties
-rw-rw-r--. 1 hadoop hadoop 5331 Oct 14 09:21 server-1.properties
-rw-rw-r--. 1 hadoop hadoop 5331 Oct 14 09:21 server-2.properties
-rw-rw-r--. 1 hadoop hadoop 5331 Oct 13 22:08 server.properties
-rw-rw-r--. 1 hadoop hadoop 3326 Apr 23 2014 test-log4j.properties
-rw-rw-r--. 1 hadoop hadoop 995 Apr 23 2014 tools-log4j.properties
-rw-rw-r--. 1 hadoop hadoop 1025 Oct 13 22:06 zookeeper.properties
[hadoop@weekend110 config]$
[hadoop@weekend110 config]$ vim server-1.properties
broker.id=1
log.dirs=/home/hadoop/data/kafka-logs-1
或者
log.dirs=/tmp/kafka-logs-1
port=9093
修改之后的结果:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9093
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
# host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don‘t drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
[hadoop@weekend110 config]$ vim server-2.properties
broker.id=2
log.dirs=/home/hadoop/data/kafka-logs-2
或者
log.dirs=/tmp/kafka-logs-2
port=9094
启动
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
2255 Jps
2181 QuorumPeerMain
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
2338 Jps
2265 Kafka
2181 QuorumPeerMain
[hadoop@weekend110 kafka_2.10-0.8.1.1]$
即,server.properties,成功!
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
2387 Jps
2265 Kafka
2181 QuorumPeerMain
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server-1.properties &
[1] 2396 //这个进程是,server-1.properties的。
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
2265 Kafka
2396 Kafka
2181 QuorumPeerMain
2470 Jps
[hadoop@weekend110 kafka_2.10-0.8.1.1]$
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
2265 Kafka
2396 Kafka
2181 QuorumPeerMain
2515 Jps
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server-2.properties &
[1] 2525 //这个进程是,server-2.properties的。
即broker 0对应是,server.properties,进程是2265,端口是9092。
broker 1对应是,server-1.properties,进程是2396,端口是9093。
broker 2对应是,server-2.properties,进程是2525,端口是9094。
在server.properties节点上,进行创建topic,并制定分区。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.
- "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
- "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
- "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Note that in my example node 1 is the leader for the only partition of the topic.
We can run the same command on the original topic we created to see where it is:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.
这意思是,创建话题my-replicated-topic,副本数是3,分区是1。
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
2338 Jps
2265 Kafka
2181 QuorumPeerMain
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic
test
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[hadoop@weekend110 kafka_2.10-0.8.1.1]$
在这里,学个知识点,可以看出:
1、test是我之前,弄的Kafka单机模式。对应着,broker0。
2、my-replicated-topic是我现在,弄的kafka分布式模式(3节点,只不过我是在一个节点上模拟仿造出server.properties、server-1.properties、server-2.properties)。
对应着,broker0、broker1、broker2。
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2
Replicas: 2,0,1 Isr: 2,0,1
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Isr: 2,0,1 这意思是,指的,同步状态,broker上的id。
Leader: 2 这意思是,指的,是在broker2那节点。
Producer生产者启动一个生产者进程来发送消息
Let‘s publish a few messages to our new topic:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Kafka官网,给的是输入
my test message 1
my test message 2
我这里,先不输入,
启动一个消费者进程来消费消息
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
好的,现在,生产者,发生消息,
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
3048 ConsoleConsumer
2525 Kafka //broker2
2985 ConsoleProducer
3179 Jps
2265 Kafka //broker
2396 Kafka //broker1
2181 QuorumPeerMain
[hadoop@weekend110 kafka_2.10-0.8.1.1]$
Now let‘s test out fault-tolerance. Broker 1 was acting as the leader so let‘s kill it:
> ps | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
But the messages are still be available for consumption even though the leader that took the writes originally is down:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Kafaka官网,这里leader是在broker1。broker1,通过kafka自带的zookeeper,到broker2。
我的测试是在broker2。Broker2,通过kafka自带的zookeeper,到broker0。
当然,用这个命令,也是可以的。
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ ps | grep server-2.properties
2524 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
3048 ConsoleConsumer
2525 Kafka
2985 ConsoleProducer
3179 Jps
2265 Kafka
2396 Kafka
2181 QuorumPeerMain
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ kill -9 2525
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
[hadoop@weekend110 kafka_2.10-0.8.1.1]$
所以,可见kafka集群,是很稳定的,照样正常运行。
读写测试下,即producer和consumer下。
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ jps
3048 ConsoleConsumer
2985 ConsoleProducer
2265 Kafka
3242 Jps
2396 Kafka
2181 QuorumPeerMain
[hadoop@weekend110 kafka_2.10-0.8.1.1]$
可见,ConsoleProducer和ConsoleConsumer依然还在。
Step 7: Use Kafka Connect to import/export data
Writing data from the console and writing it back to the console is a convenient place to start, but you‘ll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data. Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we‘ll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file. First, we‘ll start by creating some seed data to test with:
> echo -e "foo\nbar" > test.txt
Next, we‘ll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file. During startup you‘ll see a number of log messages, including some indicating that the connectors are being instantiated. Once the Kafka Connect process has started, the source connector should start reading lines from
test.txt
and producing them to the topic
connect-test
, and the sink connector should start reading messages from the topic
connect-test
and write them to the file
test.sink.txt
. We can verify the data has been delivered through the entire pipeline by examining the contents of the output file:
> cat test.sink.txt
foo
bar
Note that the data is being stored in the Kafka topic
connect-test
, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
The connectors continue to process data, so we can add data to the file and see it move through the pipeline:
> echo "Another line" >> test.txt
You should see the line appear in the console consumer output and in the sink file.
Step 8: Use Kafka Streams to process data
Kafka Streams is a client library of Kafka for real-time stream processing and analyzing data stored in Kafka brokers. This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the WordCountDemo
example code (converted to use Java 8 lambda expressions for easy reading).
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
It implements the WordCount algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data.
We will now prepare input data to a Kafka topic, which will subsequently processed by a Kafka Streams application.
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
Next, we send this input data to the input topic named streams-file-input using the console producer (in practice, stream data will likely be flowing continuously into Kafka where the application will be up and running):
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
> cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
We can now run the WordCount demo application to process the input data:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
There won‘t be any STDOUT output except log entries as the results are continuously written back into another topic named streams-wordcount-output in Kafka. The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.
We can now inspect the output of the WordCount demo application by reading from its output topic:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=http://www.mamicode.com/true /
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
with the following output data being printed to the console:
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
Here, the first column is the Kafka message key, and the second column is the message value, both in in java.lang.String
format. Note that the output is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
Now you can write more input messages to the streams-file-input topic and observe additional messages added to streams-wordcount-output topic, reflecting updated word counts (e.g., using the console producer and the console consumer, as described above).
You can stop the console consumer via Ctrl-C.
这里,以后在玩。
若是3节点(不模拟,直接分布式。),并且,使用kafka自带的zookeeper。
如现在有,weekend01、weekend02、weekend03
修改server.properties
broker.id=0
zookeeper.connect=weekend01:2181,weekend02:2181,weekend03:2181
修改server.properties
broker.id=1
zookeeper.connect=weekend01:2181,weekend02:2181,weekend03:2181
修改server.properties
broker.id=2
zookeeper.connect=weekend01:2181,weekend02:2181,weekend03:2181
当然,这里,不用临时目录,也可以用新建的目录。这里我实在是不多赘述了。
启动
利用 kafka 提供的脚本启动一个 zookeeper 实例:
在kafka安装目录下,执行
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
启动Kafka server:
在kafka安装目录下,执行
$ bin/kafka-server-start.sh config/server.properties &
然后,再打开另一个终端,使用kafka。
停止
停止刚才启动的 zookeeper 实例:
$ bin/zookeeper-server-stop.sh
停止 Kafka server :
$ bin/kafka-server-stop.sh
kafka_2.10-0.8.1.1.tgz的使用介绍:
参考http://liyonghui160com.iteye.com/blog/2105824
创建topic
列出topic
Producer
Comsumer
若是3节点(不模拟,直接分布式。),并且,使用外部的zookeeper。
[hadoop@weekend110 config]$ pwd
/home/hadoop/app/kafka_2.10-0.8.1.1/config
[hadoop@weekend110 config]$ ls
consumer.properties producer.properties test-log4j.properties zookeeper.properties
log4j.properties server.properties tools-log4j.properties
[hadoop@weekend110 config]$ vim zookeeper.properties
若是,自己新建其他目录,则可以是。
dataDir=/home/hadoop/data/zookeeper
dataLogDir=/home/hadoop/data/zkdatalog
[hadoop@weekend110 config]$ vim server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
# host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs // 或者可以自己新建目录,
/home/hadoop/data/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don‘t drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=weekend01:2181, weekend02:2181,weekend03:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
先将外部安装的zookeeper(即公共的zookeeper)启动,再启动kafka。
启动
1、启动外部的zookeeper
在/home/hadoop/app/zookeeper-3.4.6下,执行bin/zkServer.sh start
参考我的博客
1 week110的zookeeper的安装 + zookeeper提供少量数据的存储
然后,再到kafka安装目录下
$ bin/kafka-server-start.sh config/server.properties &
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ pwd
/home/hadoop/app/kafka_2.10-0.8.1.1
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties &
[1] 2692
[hadoop@weekend110 kafka_2.10-0.8.1.1]$ [2016-10-13 22:11:51,302] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,586] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,593] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,597] INFO Property log.dirs is overridden to /home/hadoop/data/kafka-logs (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,597] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,605] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,608] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,610] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,610] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,626] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,627] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,628] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,629] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,635] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,636] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,637] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2016-10-13 22:11:51,800] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2016-10-13 22:11:51,805] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2016-10-13 22:11:51,839] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2016-10-13 22:11:51,869] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,870] INFO Client environment:host.name=weekend110 (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,870] INFO Client environment:java.version=1.7.0_65 (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,870] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,870] INFO Client environment:java.home=/home/hadoop/app/jdk1.7.0_65/jre (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,870] INFO Client environment:java.class.path=:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/app/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,870] INFO Client environment:java.library.path=/usr/java/packages/lib/i386:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,871] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,871] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,871] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,872] INFO Client environment:os.arch=i386 (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,872] INFO Client environment:os.version=2.6.32-431.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,872] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,872] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,872] INFO Client environment:user.dir=/home/hadoop/app/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,875] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7b7258 (org.apache.zookeeper.ZooKeeper)
[2016-10-13 22:11:51,955] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2016-10-13 22:11:51,992] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-10-13 22:11:52,101] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x157be612ba00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-10-13 22:11:52,107] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-10-13 22:11:52,587] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2016-10-13 22:11:52,597] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2016-10-13 22:11:52,707] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-10-13 22:11:52,709] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2016-10-13 22:11:53,049] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-10-13 22:11:53,223] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-10-13 22:11:53,771] INFO Registered broker 0 at path /brokers/ids/0 with address weekend110:9092. (kafka.utils.ZkUtils$)
[2016-10-13 22:11:53,825] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-10-13 22:11:53,999] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
再,打开另一个终端,使用kafka,即可。
[hadoop@weekend110 ~]$ jps
2769 Jps
2692 Kafka
2634 QuorumPeerMain
[hadoop@weekend110 ~]$
停止
停止 外部的zookeeper:
在/home/hadoop/app/zookeeper-3.4.6下,执行bin/zkServer.sh stop
停止 Kafka server :
$ bin/kafka-server-stop.sh
kafka_2.10-0.8.1.1.tgz的使用介绍:
参考http://liyonghui160com.iteye.com/blog/2105824
创建topic
列出topic
Producer
Comsumer
Kafka的Java api
Kafka客户端编程
写kafka的客户端,有2个,生产者和消费者。
weekend110-kafka -> Build Path -> Configure Build Path
这里,我参考了网上的。http://download.csdn.net/download/alexander_zhou/9192011
了手动,避免后续,还得查漏补缺添加jar包。强烈建议Maven。
Eclipse下新建Maven项目、自动打依赖jar包
其实啊,用Maven多么好。
参考链接:Kafka使用Java客户端进行访问
3、新建包cn.itcast.kafka
4、新建ProducerDemo.java
新建ConsumerDemo.java
这里,我就以分布式集群的配置,附上代码。工作中,就是这么干的!
ProducerDemo.java
package cn.itcast.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zk.connect", "weekend01:2181,weekend02:2181,weekend03:2181");
props.put("metadata.broker.list","weekend01:9092,weekend02:9092,weekend03:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
// 发送业务消息
// 读取文件 读取内存数据库 读socket端口
for (int i = 1; i <= 100; i++) {
Thread.sleep(500);
producer.send(new KeyedMessage<String, String>("wordcount",
"i said i love you baby for" + i + "times,will you have a nice day with me tomorrow"));
}
}
}
package cn.itcast.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zk.connect", "weekend01:2181,weekend02:2181,weekend03:2181");
props.put("metadata.broker.list","weekend01:9092,weekend02:9092,weekend03:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
// 发送业务消息
// 读取文件 读取内存数据库 读socket端口
for (int i = 1; i <= 100; i++) {
Thread.sleep(500);
producer.send(new KeyedMessage<String, String>("wordcount",
"i said i love you baby for" + i + "times,will you have a nice day with me tomorrow"));
}
}
}
ConsumerDemo.java
package cn.itcast.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class ConsumerDemo {
private static final String topic = "mysons";
private static final Integer threads = 1;
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "weekend01:2181,weekend02:2181,weekend03:2181");
props.put("group.id", "1111");
props.put("auto.offset.reset", "smallest");
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
topicCountMap.put("mygirls", 1);
topicCountMap.put("myboys", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("mygirls");
for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
new Thread(new Runnable() {
@Override
public void run() {
for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
String msg = new String(mm.message());
System.out.println(msg);
}
}
}).start();
}
}
}
启动zookeeper,这里不赘述了。真的很简单
启动kafka服务
参考博客: http://www.aboutyun.com/forum.php?mod=viewthread&tid=12847
Kafka单机、集群模式安装详解(一)
Kafka单机、集群模式安装详解(二)
Kafka使用Java客户端进行访问
参考博客:http://www.jianshu.com/p/425a7d8735e2
Kafka学习笔记
4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程