首页 > 代码库 > Kafka编程实例

Kafka编程实例

 编程

    Producer是一个应用程序,它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer能够使用不同的语言实现,比如java、C和Python。下面的这部图表解释了消息producer的Kafka API.


下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。

发送简单消息给Kafka broker,Producer端编写类ClusterProducer。

public classClusterProducer extends Thread {
    private static final Log log =LogFactory.getLog(ClusterProducer.class);
 
    public void sendData() {
        Random rnd = new Random();
        Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
        if (props == null) {
            log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
           return;
        }
        //set the producer configurationproperties
        ProducerConfig config = newProducerConfig(props);
        Producer<String, String> producer= new Producer<String, String>(config);
 
        //Send the data
        int count = 1;
        KeyedMessage<String, String>data;
        while (count < 100) {
            String sign = "*";
            String ip = "192.168.2."+ rnd.nextInt(255);
            StringBuffer sb = newStringBuffer();
            for (int i = 0; i < count; i++){
                sb.append(sign);
            }
            log.info("set data:" +sb);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            data = http://www.mamicode.com/new KeyedMessage(PropertiesSettings.TOPIC_NAME, ip, sb.toString());>

定于Consumer获取端,获取对应topic的数据:

public class Consumerextends Thread {
    private static final Log log =LogFactory.getLog(Consumer.class);
    private final ConsumerConnector consumer;
    private final String topic;
 
    public Consumer(String topic) {
        consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }
 
    private static ConsumerConfigcreateConsumerConfig() {
        Properties props = new Properties();
       props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id",KafkaProperties.groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
 
    }
 
    public void run() {
        Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, newInteger(1));
        Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]>it = stream.iterator();
        while (it.hasNext()) {
            log.info("+message: " +new String(it.next().message()));
        }
    }
 
    public static void main(String[] args) {
        Consumer client = new Consumer("cluster_statistics_topic");
        client.run();
    }
}


     分别执行上面的代码,可以发送或者得到对应topic信息。

     Enjoy yourself!(*^__^*) ……

Kafka编程实例