首页 > 代码库 > 使用java创建kafka的生产者和消费者

使用java创建kafka的生产者和消费者

 创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111
        [root@h5 kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
    查看Kafka的主题详情
        [root@h5 kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
    查看Kafka所有的主题    
        [root@h5 kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181
    删除Kafka指定的主题    
        [root@h5 kafka]# bin/kafka-topics.sh --delete --zookeeper h5:2181,h6:2181,h7:2181 --topic test111
        如删除时提示
        Topic guowang1 is marked for deletion.
   Note: This will have no impact if delete.topic.enable is not set to true.
      请修改Kafka/config/server.properties新增delete.topic.enable=true
        kafka_2.10-0.8.2.0.jar
        kafka-clients-0.8.2.0.jar
        metrics-core-2.2.0.jar
        scala-library-2.10.4.jar
        zkclient-0.3.jar
        zookeeper-3.4.6.jar
    
    1、生产者        

 1 package storm.test.kafka; 2  3         import java.util.Properties; 4  5         import kafka.javaapi.producer.Producer; 6         import kafka.producer.KeyedMessage; 7         import kafka.producer.ProducerConfig; 8         import kafka.serializer.StringEncoder; 9 10         public class TestProducer {11 12             public static void main(String[] args) throws Exception {13                 Properties prop = new Properties();14                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");15                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");16                 prop.put("serializer.class", StringEncoder.class.getName());17                 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));18                 int i = 0;19                 while(true){20                     producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));21                     Thread.sleep(1000);22                 }23             }24 25         }


    2、消费者        

 1 package storm.test.kafka; 2  3         import java.util.HashMap; 4         import java.util.List; 5         import java.util.Map; 6         import java.util.Properties; 7  8         import kafka.consumer.Consumer; 9         import kafka.consumer.ConsumerConfig;10         import kafka.consumer.ConsumerIterator;11         import kafka.consumer.KafkaStream;12         import kafka.javaapi.consumer.ConsumerConnector;13         import kafka.serializer.StringEncoder;14 15         public class TestConsumer {16 17             static final String topic = "test111";18             19             public static void main(String[] args) {20                 Properties prop = new Properties();21                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");22                 prop.put("serializer.class", StringEncoder.class.getName());23                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");24                 prop.put("group.id", "group1");25                 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));26                 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();27                 topicCountMap.put(topic, 1);28                 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);29                 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);30                 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();31                 while (iterator.hasNext()) {32                     String msg = new String(iterator.next().message());33                     System.out.println("收到消息:"+msg);34                 }35             }36 37         }

 

使用java创建kafka的生产者和消费者