首页 > 代码库 > storm入门——本地模式helloworld

storm入门——本地模式helloworld

创建maven项目,在pom.xml中加入以下配置:

    <dependency>            <groupId>org.apache.storm</groupId>            <artifactId>storm-core</artifactId>            <type>jar</type>            <version>0.9.3-rc1</version>        </dependency>

创建SimpleSpout类用于获取数据流:

 1 package com.hirain.storm.helloworld; 2  3 import java.util.Map; 4 import java.util.Random; 5  6 import backtype.storm.spout.SpoutOutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.base.BaseRichSpout;10 import backtype.storm.tuple.Fields;11 import backtype.storm.tuple.Values;12 13 public class SimpleSpout extends BaseRichSpout{14 15     /**16      * 17      */18     private static final long serialVersionUID = 1L;19     20     //用来发射数据的工具类21     private SpoutOutputCollector collector;22     23     private static String[] info = new String[]{24         "comaple\t,12424,44w46,654,12424,44w46,654,",25         "lisi\t,435435,6537,12424,44w46,654,",26         "lipeng\t,45735,6757,12424,44w46,654,",27         "hujintao\t,45735,6757,12424,44w46,654,",28         "jiangmin\t,23545,6457,2455,7576,qr44453",29         "beijing\t,435435,6537,12424,44w46,654,",30         "xiaoming\t,46654,8579,w3675,85877,077998,",31         "xiaozhang\t,9789,788,97978,656,345235,09889,",32         "ceo\t,46654,8579,w3675,85877,077998,",33         "cto\t,46654,8579,w3675,85877,077998,",34         "zhansan\t,46654,8579,w3675,85877,077998,"};35     36     Random random=new Random();37     38     39     /**40      * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用41      */42     public void nextTuple() {43         try {44              String msg = info[random.nextInt(11)];45             // 调用发射方法46             collector.emit(new Values(msg));47             // 模拟等待100ms48             Thread.sleep(100);49         } catch (InterruptedException e) {50             e.printStackTrace();51         }52     }53     /**54      * 初始化collector55      */56     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {57         this.collector = collector;58         59     }60     61     62     /**63      * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。64      * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构65      */66     public void declareOutputFields(OutputFieldsDeclarer declarer) {67         declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应68     }69 70 }

创建SimpleBolt类,用于处理数据:

 1 package com.hirain.storm.helloworld; 2  3 import backtype.storm.topology.BasicOutputCollector; 4 import backtype.storm.topology.OutputFieldsDeclarer; 5 import backtype.storm.topology.base.BaseBasicBolt; 6 import backtype.storm.tuple.Fields; 7 import backtype.storm.tuple.Tuple; 8 import backtype.storm.tuple.Values; 9 10 11 12 public class SimpleBolt extends BaseBasicBolt {13 14     /**15      * 16      */17     private static final long serialVersionUID = 1L;18 19     public void execute(Tuple input,BasicOutputCollector collector) {20         try {21             String msg = input.getString(0);22             if (msg != null){23                 //System.out.println("msg="+msg);24                 collector.emit(new Values(msg + "msg is processed!"));25             }26                 27         } catch (Exception e) {28             e.printStackTrace(); 29         }30 31     }32 33     public void declareOutputFields(34             OutputFieldsDeclarer declarer) {35         declarer.declare(new Fields("info"));36 37     }38 39 }

创建main方法配置storm的topology并启动本地模式运行:

 1 package com.hirain.storm.helloworld; 2  3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7  8 public class SimpleTopology { 9     10     11     public static void main(String[] args) {12         try {13             // 实例化TopologyBuilder类。14             TopologyBuilder topologyBuilder = new TopologyBuilder();15             // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。16             topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);17             // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。18             topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");19             Config config = new Config();20             config.setDebug(true);21             if (args != null && args.length > 0) {22                 config.setNumWorkers(1);23                 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());24             } else {25                 // 这里是本地模式下运行的启动代码。26                 config.setMaxTaskParallelism(1);27                 LocalCluster cluster = new LocalCluster();28                 cluster.submitTopology("simple", config, topologyBuilder.createTopology());29             }30             31         } catch (Exception e) {32             e.printStackTrace(); 33         }34     }35 }

以上为storm的简单的helloworld,仅供参考

storm入门——本地模式helloworld