首页 > 代码库 > kafka 基础01

kafka 基础01

总结:
   1.kafka 中可以分步不同的组,消息可以被不同组里面的消费者多次消费
   2. 观察zookeeper中kafka中的信息:
[zk: air00:2181(CONNECTED) 8] ls /
[consumers, config, controller, admin, brokers, zookeeper, controller_epoch]
[zk: air00:2181(CONNECTED) 9] ls /consumers
[test01, test02]
[zk: air00:2181(CONNECTED) 10] ls /consumers/test01
[offsets, owners, ids]
[zk: air00:2181(CONNECTED) 11] ls /consumers/test01/offsets
[test]
[zk: air00:2181(CONNECTED) 12] ls /consumers/test01/offsets/test
[1, 0]
[zk: air00:2181(CONNECTED) 13] 
    3. 新来的消费者,不能获取老的数据

可以看出消费者的信息存在于zookeeper中的节点里面
生产者:
package com.kafka.test;
import java.util.*;  
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;  
import kafka.javaapi.producer.Producer;  
public class Producer01 {

	public static void main(String[] args) {  
		String topic="test";
        Properties props = new Properties();  //9092
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "air00:9092");
  
        ProducerConfig config = new ProducerConfig(props);  
        Producer<String, String> producer = new Producer<String, String>(config);  
        producer.send(new KeyedMessage<String, String>(topic, "test" ));
        producer.close();  
    }  
}

消费者:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class Consumer01 {
	
	static String groupId="test01";
	static String topic="test";
	public static void main(String[] args) {
	Properties props = new Properties();
    props.put("zookeeper.connect","air00:2181,air01:2181,air02:2181");
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    		new ConsumerConfig(props));
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while(it.hasNext())
      System.out.println(new String(it.next().message()));
    }   
}

kafka 基础01