首页 > 代码库 > Kafka编程实例
Kafka编程实例
编程
Producer是一个应用程序,它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer能够使用不同的语言实现,比如java、C和Python。下面的这部图表解释了消息producer的Kafka API.
下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。
发送简单消息给Kafka broker,Producer端编写类ClusterProducer。
public classClusterProducer extends Thread { private static final Log log =LogFactory.getLog(ClusterProducer.class); public void sendData() { Random rnd = new Random(); Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME); if (props == null) { log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME); return; } //set the producer configurationproperties ProducerConfig config = newProducerConfig(props); Producer<String, String> producer= new Producer<String, String>(config); //Send the data int count = 1; KeyedMessage<String, String>data; while (count < 100) { String sign = "*"; String ip = "192.168.2."+ rnd.nextInt(255); StringBuffer sb = newStringBuffer(); for (int i = 0; i < count; i++){ sb.append(sign); } log.info("set data:" +sb); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } data = http://www.mamicode.com/new KeyedMessage(PropertiesSettings.TOPIC_NAME, ip, sb.toString());> 定于Consumer获取端,获取对应topic的数据:
public class Consumerextends Thread { private static final Log log =LogFactory.getLog(Consumer.class); private final ConsumerConnector consumer; private final String topic; public Consumer(String topic) { consumer =kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); this.topic = topic; } private static ConsumerConfigcreateConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id",KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer>topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, newInteger(1)); Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]>it = stream.iterator(); while (it.hasNext()) { log.info("+message: " +new String(it.next().message())); } } public static void main(String[] args) { Consumer client = new Consumer("cluster_statistics_topic"); client.run(); } }分别执行上面的代码,可以发送或者得到对应topic信息。
Enjoy yourself!(*^__^*) ……
Kafka编程实例
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。