首页 > 代码库 > Apache Kafka系列(三) Java API使用
Apache Kafka系列(三) Java API使用
Apache Kafka系列(一) 起步
Apache Kafka系列(二) 命令行工具(CLI)
Apache Kafka系列(三) Java API使用
摘要:
Apache Kafka Java Client API
一、基本概念
Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如:
1.创建Topic
2.罗列出已存在的Topic
3.对已有Topic的Produce/Consume测试
跟其他的消息系统一样,Kafka提供了多种不用语言实现的客户端API,如:Java,Python,Ruby,Go等。这些API极大的方便用户使用Kafka集群,本文将展示这些API的使用
二、前提
- 在本地虚拟机中安装了Kafka 0.11.0版本,可以参照前一篇文章: Apache Kafka系列(一) 起步
- 本地安装有JDK1.8
- IDEA编译器
- Maven3
三、项目结构
Maven pom.xml如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.randy</groupId> <artifactId>kafka_api_demo</artifactId> <version>1.0-SNAPSHOT</version> <name>Maven</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies> </project>
四、源码
4.1 Producer的源码
package com.randy; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /** * Author : RandySun * Date : 2017-08-13 16:23 * Comment : */ public class ProducerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.110:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloWorld", msg)); System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
可以使用KafkaProducer类的实例来创建一个Producer,KafkaProducer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:
- bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.110:9092");
bootstrap.servers是Kafka集群的IP地址,如果Broker数量超过1个,则使用逗号分隔,如"192.168.1.110:9092,192.168.1.110:9092"。其中,192.168.1.110是我的其中一台虚拟机的
IP地址,9092是所监听的端口
- key.serializer & value.serializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消
息序列化为二进制类型。本例是发送文本消息到Kafka集群,所以使用的是StringSerializer。
- 发送Message到Kafka集群
for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloWorld", msg)); System.out.println("Sent:" + msg); }
上述代码会发送100个消息到HelloWorld这个Topic
4.2 Consumer的源码
package com.randy; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * Author : RandySun * Date : 2017-08-13 17:06 * Comment : */ public class ConsumerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.110:9092"); properties.put("group.id", "group-1"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = http://www.mamicode.com/%s", record.offset(), record.value()); System.out.println(); } } } }
可以使用KafkaConsumer类的实例来创建一个Consumer,KafkaConsumer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:
- bootstrap.servers
和Producer一样,是指向Kafka集群的IP地址,以逗号分隔。
- group.id
Consumer分组ID
- key.deserializer and value.deserializer
发序列化。Consumer把来自Kafka集群的二进制消息反序列化为指定的类型。因本例中的Producer使用的是String类型,所以调用StringDeserializer来反序列化
Consumer订阅了Topic为HelloWorld的消息,Consumer调用poll方法来轮循Kafka集群的消息,其中的参数100是超时时间(Consumer等待直到Kafka集群中没有消息为止):
kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = http://www.mamicode.com/%s", record.offset(), record.value()); System.out.println(); } }
五、总结
本文展示了如何创建一个Producer并生成String类型的消息,Consumer消费这些消息。这些都是基于Apache Kafka 0.11.0 Java API。
Apache Kafka系列(三) Java API使用