首页 > 代码库 > Kafka详解五、Kafka Consumer的底层API- SimpleConsumer
Kafka详解五、Kafka Consumer的底层API- SimpleConsumer
Kafka提供了两套API给Consumer
- The high-level Consumer API
- The SimpleConsumer API
第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情
- 一个消息读取多次
- 在一个处理过程中只消费Partition其中的一部分消息
- 添加事务管理机制以保证消息被处理且仅被处理一次
使用SimpleConsumer有哪些弊端呢?
- 必须在程序中跟踪offset值
- 必须找出指定Topic Partition中的lead broker
- 必须处理broker的变动
使用SimpleConsumer的步骤
- 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker
- 找出指定Topic Partition中的所有备份broker
- 构造请求
- 发送请求查询数据
- 处理leader broker变更
代码实例:
package bonree.consumer; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; public class SimpleExample { private List<String> m_replicaBrokers = new ArrayList<String>(); public SimpleExample() { m_replicaBrokers = new ArrayList<String>(); } public static void main(String args[]) { SimpleExample example = new SimpleExample(); // 最大读取消息数量 long maxReads = Long.parseLong("3"); // 要订阅的topic String topic = "mytopic"; // 要查找的分区 int partition = Integer.parseInt("0"); // broker节点的ip List<String> seeds = new ArrayList<String>(); seeds.add("192.168.4.30"); seeds.add("192.168.4.31"); seeds.add("192.168.4.32"); // 端口 int port = Integer.parseInt("9092"); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // 获取指定Topic partition的元数据 PartitionMetadata metadata = http://www.mamicode.com/findLeader(a_seedBrokers, a_port, a_topic, a_partition);>
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。