首页 > 代码库 > Kafka JAVA客户端代码示例--高级应用
Kafka JAVA客户端代码示例--高级应用
什么时间使用高级应用?
- 针对一个消息读取多次
- 在一个process中,仅仅处理一个topic中的一组partitions
- 使用事务,确保每个消息只被处理一次
使用高级应用(调用较底层函数)的缺点?
SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)
- 在应用程序中跟踪上次消息处理的offset
- 确定一个topic partition的lead broker
- 手工处理broker leander的改变
使用底层函数(SimpleConsumer)开发的步骤
- 通过active broker,确定topic partition的lead broker
- 确定topic partition的replicat brokers
- 根据需要,创建数据请求
- 抓取数据
- 识别lead brokder改变并进行恢复
代码示例
import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; /** * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example * @author Fung * */ public class ConsumerSimpleExample { public static void main(String arg[]) { String[] args={"20","page_visits","2","172.168.63.233","9092"}; ConsumerSimpleExample example = new ConsumerSimpleExample(); long maxReads = Long.parseLong(args[0]); String topic = args[1]; int partition = Integer.parseInt(args[2]); List<String> seeds = new ArrayList<String>(); seeds.add(args[3]); int port = Integer.parseInt(args[4]); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } private List<String> m_replicaBrokers = new ArrayList<String>(); public ConsumerSimpleExample() { m_replicaBrokers = new ArrayList<String>(); } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // find the meta data about the topic and partition we are interested in // PartitionMetadata metadata = http://www.mamicode.com/findLeader(a_seedBrokers, a_port, a_topic,>参考
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。