首页 > 代码库 > kafka producer consumer

kafka producer consumer

package demo;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class producer {    private final Producer<String, String> producer;    public final static String TOPIC = "test";    private producer() {        Properties props = new Properties();        // 此处配置的是kafka的端口        props.put("metadata.broker.list", "192.168.152.20:9092");        // 配置value的序列化类        props.put("serializer.class", "kafka.serializer.StringEncoder");        // 配置key的序列化类        props.put("key.serializer.class", "kafka.serializer.StringEncoder");        // request.required.acks        // 0, which means that the producer never waits for an acknowledgement        // from the broker (the same behavior as 0.7). This option provides the        // lowest latency but the weakest durability guarantees (some data will        // be lost when a server fails).        // 1, which means that the producer gets an acknowledgement after the        // leader replica has received the data. This option provides better        // durability as the client waits until the server acknowledges the        // request as successful (only messages that were written to the        // now-dead leader but not yet replicated will be lost).        // -1, which means that the producer gets an acknowledgement after all        // in-sync replicas have received the data. This option provides the        // best durability, we guarantee that no messages will be lost as long        // as at least one in sync replica remains.        props.put("request.required.acks", "-1");        producer = new Producer<String, String>(new ProducerConfig(props));    }    void produce() {        int messageNo = 1000;        final int COUNT = 10000;        while (messageNo < COUNT) {            String key = String.valueOf(messageNo);            String data = "hello kafka message " + key;            producer.send(new KeyedMessage<String, String>(TOPIC, key, data));            System.out.println(data);            messageNo++;        }    }    public static void main(String[] args) {        new producer().produce();    }}

 

 

package demo;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties;public class consumer {    private final ConsumerConnector consumer;    private consumer() {        Properties props = new Properties();        //zookeeper 配置        props.put("zookeeper.connect", "192.168.152.20:2181");        //group 代表一个消费组        props.put("group.id", "jd-group");        //zk连接超时        props.put("zookeeper.session.timeout.ms", "4000");        props.put("zookeeper.sync.time.ms", "200");        props.put("auto.commit.interval.ms", "1000");        props.put("auto.offset.reset", "smallest");        //序列化类        props.put("serializer.class", "kafka.serializer.StringEncoder");        ConsumerConfig config = new ConsumerConfig(props);        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);    }    void consume() {        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();        topicCountMap.put(producer.TOPIC, new Integer(1));        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());        Map<String, List<KafkaStream<String, String>>> consumerMap =                 consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);        KafkaStream<String, String> stream = consumerMap.get(producer.TOPIC).get(0);        ConsumerIterator<String, String> it = stream.iterator();        while (it.hasNext())            System.out.println(it.next().message());    }    public static void main(String[] args) {        new consumer().consume();    }}

 技术分享

其中,zookeeper的地址,在kafka config/consummer.properties的目录下,要配置下

kafka producer consumer