首页 > 代码库 > Kafka使用Java客户端进行访问

Kafka使用Java客户端进行访问

添加maven依赖包

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>0.9.0.1</version></dependency><dependency>    <groupId>log4j</groupId>    <artifactId>log4j</artifactId>    <version>1.2.17</version></dependency><dependency>    <groupId>org.slf4j</groupId>    <artifactId>slf4j-api</artifactId>    <version>1.7.25</version></dependency><dependency>      <groupId>org.apache.kafka</groupId>      <artifactId>kafka_2.10</artifactId>      <version>0.8.2.0</version>  </dependency>

建立包结构

  建立包结构如下图所示为例:

技术分享

  在log4j.properties中输入:

log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

生产者代码

技术分享
 1 package com.juyun.kafka; 2  3 import java.util.Properties; 4  5 import org.apache.log4j.PropertyConfigurator; 6  7 import kafka.javaapi.producer.Producer;   8 import kafka.producer.KeyedMessage;   9 import kafka.producer.ProducerConfig;  10 import kafka.serializer.StringEncoder;  11   12 public class KafkaProducerExample extends Thread {  13     private String topic;  14       15     public KafkaProducerExample(String topic){  16         super();  17         this.topic=topic;  18     }  19       20     @Override  21     public void run() {  22         Producer<Integer, String> producer=CreateProducer();  23         for (int i = 1; i < 10; i++) {  24             String message="message"+i;  25             producer.send(new KeyedMessage<Integer, String>(topic, message)); // 调用producer的send方法发送数据  26             System.out.println("发送:"+message);  27             try {  28                 sleep(1000);  29             } catch (InterruptedException e) {  30                 e.printStackTrace();  31             }  32         }  33     }  34       35     public Producer<Integer, String> CreateProducer(){  36         Properties props=new Properties();  37         props.setProperty("zookeeper.connect", "172.16.0.157:2181"); // 与zookeeper建立连接  38         props.setProperty("serializer.class", StringEncoder.class.getName()); // key.serializer.class默认为serializer.class39         props.setProperty("metadata.broker.list", "172.16.0.157:9092"); // kafka broker对应的主机,格式为host1:port1,host2:port240         props.put("request.required.acks","1"); // 等待topic中某个partition leader保存成功的状态反馈41         Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props)); // 通过配置文件,创建生产者  42         return producer;  43     }  44       45     public static void main(String[] args){46         PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件47         new KafkaProducerExample("test").start(); // 输入topic,启动线程  48     }  49       50 }  
KafkaProducerExample.java

消费者代码

技术分享
 1 package com.juyun.kafka; 2  3 import java.util.HashMap;   4 import java.util.List;   5 import java.util.Map;   6 import java.util.Properties; 7  8 import org.apache.log4j.PropertyConfigurator; 9 10 import kafka.consumer.Consumer;  11 import kafka.consumer.ConsumerConfig;  12 import kafka.consumer.ConsumerIterator;  13 import kafka.consumer.KafkaStream;  14 import kafka.javaapi.consumer.ConsumerConnector;  15   16   17 public class KafkaConsumerExample extends Thread{  18     private String topic; 19       20     private KafkaConsumerExample(String topic) {  21         super();  22         this.topic=topic;  23     }  24       25     @Override  26     public void run() {  27         ConsumerConnector consumer = createConsumer(); // 创建消费者连接  28         Map<String,Integer> topicCountMap=new HashMap<String, Integer>(); // 定义一个map  29         topicCountMap.put(topic, 1);  30         // Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流  31         Map<String, List<KafkaStream<byte[], byte[]>>> MessageStreams = consumer.createMessageStreams(topicCountMap);  32         // 取出 topic1对应的 streams33         KafkaStream<byte[], byte[]> kafkaStream = MessageStreams.get(topic).get(0);34         // 迭代获取到的流35         ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();36         while (iterator.hasNext()) {  37             String message = new String(iterator.next().message());  38             System.out.println("接收到:"+message);  39         }  40     }  41       42     public ConsumerConnector createConsumer(){  43         Properties properties = new Properties();  44         properties.setProperty("zookeeper.connect", "172.16.0.157:2181"); 45         properties.put("zookeeper.connectiontimeout.ms", "6000");46         properties.setProperty("group.id", "group1"); // 设置这个消费者所在的group47         // 只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出48         ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  49         return createJavaConsumerConnector;  50     }  51       52     public static void main(String[] args) { 53         PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件54         new KafkaConsumerExample("test").start();  55     }  56 }  
KafkaConsumerExample.java

执行程序

  需要先启动zookeeper

#进入到Zookeeper的bin目录下cd /opt/zookeeper-3.4.8/bin#启动服务./zkServer.sh start

  再启动Kafka

#进入到Kafka安装目录bin/kafka-server-start.sh config/server.properties

  并可以同时在命令终端启动生产者和消费者进行检测

#启动生产者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
#启动消费者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic --from-beginning

 

Kafka使用Java客户端进行访问