首页 > 代码库 > storm入门——本地模式helloworld
storm入门——本地模式helloworld
创建maven项目,在pom.xml中加入以下配置:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <type>jar</type> <version>0.9.3-rc1</version> </dependency>
创建SimpleSpout类用于获取数据流:
1 package com.hirain.storm.helloworld; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout;10 import backtype.storm.tuple.Fields;11 import backtype.storm.tuple.Values;12 13 public class SimpleSpout extends BaseRichSpout{14 15 /**16 * 17 */18 private static final long serialVersionUID = 1L;19 20 //用来发射数据的工具类21 private SpoutOutputCollector collector;22 23 private static String[] info = new String[]{24 "comaple\t,12424,44w46,654,12424,44w46,654,",25 "lisi\t,435435,6537,12424,44w46,654,",26 "lipeng\t,45735,6757,12424,44w46,654,",27 "hujintao\t,45735,6757,12424,44w46,654,",28 "jiangmin\t,23545,6457,2455,7576,qr44453",29 "beijing\t,435435,6537,12424,44w46,654,",30 "xiaoming\t,46654,8579,w3675,85877,077998,",31 "xiaozhang\t,9789,788,97978,656,345235,09889,",32 "ceo\t,46654,8579,w3675,85877,077998,",33 "cto\t,46654,8579,w3675,85877,077998,",34 "zhansan\t,46654,8579,w3675,85877,077998,"};35 36 Random random=new Random();37 38 39 /**40 * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用41 */42 public void nextTuple() {43 try {44 String msg = info[random.nextInt(11)];45 // 调用发射方法46 collector.emit(new Values(msg));47 // 模拟等待100ms48 Thread.sleep(100);49 } catch (InterruptedException e) {50 e.printStackTrace();51 }52 }53 /**54 * 初始化collector55 */56 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {57 this.collector = collector;58 59 }60 61 62 /**63 * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。64 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构65 */66 public void declareOutputFields(OutputFieldsDeclarer declarer) {67 declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应68 }69 70 }
创建SimpleBolt类,用于处理数据:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Fields; 7 import backtype.storm.tuple.Tuple; 8 import backtype.storm.tuple.Values; 9 10 11 12 public class SimpleBolt extends BaseBasicBolt {13 14 /**15 * 16 */17 private static final long serialVersionUID = 1L;18 19 public void execute(Tuple input,BasicOutputCollector collector) {20 try {21 String msg = input.getString(0);22 if (msg != null){23 //System.out.println("msg="+msg);24 collector.emit(new Values(msg + "msg is processed!"));25 }26 27 } catch (Exception e) {28 e.printStackTrace(); 29 }30 31 }32 33 public void declareOutputFields(34 OutputFieldsDeclarer declarer) {35 declarer.declare(new Fields("info"));36 37 }38 39 }
创建main方法配置storm的topology并启动本地模式运行:
1 package com.hirain.storm.helloworld; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7 8 public class SimpleTopology { 9 10 11 public static void main(String[] args) {12 try {13 // 实例化TopologyBuilder类。14 TopologyBuilder topologyBuilder = new TopologyBuilder();15 // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。16 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);17 // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。18 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");19 Config config = new Config();20 config.setDebug(true);21 if (args != null && args.length > 0) {22 config.setNumWorkers(1);23 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());24 } else {25 // 这里是本地模式下运行的启动代码。26 config.setMaxTaskParallelism(1);27 LocalCluster cluster = new LocalCluster();28 cluster.submitTopology("simple", config, topologyBuilder.createTopology());29 }30 31 } catch (Exception e) {32 e.printStackTrace(); 33 }34 }35 }
以上为storm的简单的helloworld,仅供参考
storm入门——本地模式helloworld
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。