首页 > 代码库 > kafka 基础01
kafka 基础01
总结: 1.kafka 中可以分步不同的组,消息可以被不同组里面的消费者多次消费 2. 观察zookeeper中kafka中的信息: [zk: air00:2181(CONNECTED) 8] ls / [consumers, config, controller, admin, brokers, zookeeper, controller_epoch] [zk: air00:2181(CONNECTED) 9] ls /consumers [test01, test02] [zk: air00:2181(CONNECTED) 10] ls /consumers/test01 [offsets, owners, ids] [zk: air00:2181(CONNECTED) 11] ls /consumers/test01/offsets [test] [zk: air00:2181(CONNECTED) 12] ls /consumers/test01/offsets/test [1, 0] [zk: air00:2181(CONNECTED) 13] 3. 新来的消费者,不能获取老的数据 可以看出消费者的信息存在于zookeeper中的节点里面 生产者: package com.kafka.test; import java.util.*; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.javaapi.producer.Producer; public class Producer01 { public static void main(String[] args) { String topic="test"; Properties props = new Properties(); //9092 props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "air00:9092"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); producer.send(new KeyedMessage<String, String>(topic, "test" )); producer.close(); } } 消费者: import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class Consumer01 { static String groupId="test01"; static String topic="test"; public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect","air00:2181,air01:2181,air02:2181"); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector( new ConsumerConfig(props)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) System.out.println(new String(it.next().message())); } }
kafka 基础01
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。