首页 > 代码库 > Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现
Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现
storm在0.9.3版本中提供了一个抽象通用的Bolt KafkaBolt用来实现数据写入kafka的目的,我们先来看一个具体的例子,然后再看如何实现的。
我们用代码加注释的方式,来看下如何使用
//1. KafkaBolt的前置组件emit出来的(可以是spout也可以是bolt) Spout spout = new Spout(new Fields("key", "message")); builder.setSpout("spout", spout); //2. 给KafkaBolt配置topic及前置tuple消息到kafka的mapping关系 KafkaBolt bolt = new KafkaBolt(); bolt.withTopicSelector(new DefaultTopicSelector("tony-S2K")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout"); Config conf = new Config(); //3. 设置kafka producer的配置 Properties props = new Properties(); props.put("metadata.broker.list", "10.100.90.203:9092"); props.put("producer.type","async"); props.put("request.required.acks", "0"); // 0 ,-1 ,1 props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); conf.put("topic","tony-S2K"); if(args.length > 0){ // cluster submit. try { StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else{ new LocalCluster().submitTopology("kafkaboltTest", conf, builder.createTopology()); }
完整的代码参考github:
https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/KafkaBoltTestTopology.java
Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。