首页 > 代码库 > JStorm第一个程序WordCount详解
JStorm第一个程序WordCount详解
- nimbus是整个storm任务的管理者,并不实际进行工作。负责在集群中分发代码,对节点分配任务,并监视主机故障。
- supervisor是实际进行工作的节点,负责监听工作节点上已经分配的主机作业,启动和停止Nimbus已经分配的工作进程。
- Worker是具体处理Spout/Bolt逻辑的进程,worker数量由拓扑中的conf.setNumWorkers来定义,storm会在每个Worker上均匀分配任务,一个Worker只能执行一个topology,但是可以执行其中的多个任务线程。
- 一个worker是一个进程,里面可以同时运行多个线程,这些线程就是task
参考:http://blog.csdn.net/cuihaolong/article/details/52652686(storm各个节点介绍和容错机制)
- 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
- 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。
- spout随机发送一个准备好的字符串数组里面的一个字符串(sentence)
- 第一层bolt,splitBolt负责对spout发过来的数据(sentence)进行split,分解成独立的单词,并按照一定的规则发往下一层bolt处理
- 第二层bolt,接受第一层bolt传过来的数据,并对各个单词进行数量计算
- spout数据源
- bolt1进行split操作
- bolt2进行count操作
- Topolgy运行程序
1 package act.chenkh.study.jstormPlay; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7 import backtype.storm.tuple.Fields; 8 9 public class WordCountTopology { 10 public static void main(String[] args) throws Exception { 11 /**第一步,设计一个Topolgy*/ 12 TopologyBuilder builder = new TopologyBuilder(); 13 /* 14 * 设置spout和bolt,完整参数为spout的id(即name),spout对象,并发数(此项没有默认null) 15 */ 16 //setSpout 17 builder.setSpout("sentence-spout",new RandomSentenceSpout(),1); 18 //setBolt:SplitBolt的grouping策略是上层随机分发,CountBolt的grouping策略是按照上层字段分发 19 //如果想要从多个Bolt获取数据,可以继续设置grouping 20 builder.setBolt("split-bolt", new SplitBolt(),1) 21 .shuffleGrouping("sentence-spout"); 22 builder.setBolt("count-bolt", new CountBolt(),1) 23 .fieldsGrouping("split-bolt", new Fields("word")); 24 /**第二步,进行基本配置*/ 25 Config conf = new Config(); 26 //作用和影响??????????? 27 conf.setDebug(true); 28 if (args != null && args.length > 0) { 29 conf.setNumWorkers(1); 30 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 31 } 32 else { 33 /* 34 * run in local cluster, for test in eclipse. 35 */ 36 conf.setMaxTaskParallelism(3); 37 LocalCluster cluster = new LocalCluster(); 38 cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); 39 Thread.sleep(Integer.MAX_VALUE); 40 cluster.shutdown(); 41 } 42 } 43 }
1 package act.chenkh.study.jstormPlay; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.topology.TopologyBuilder; 7 import backtype.storm.tuple.Fields; 8 9 public class WordCountTopology { 10 public static void main(String[] args) throws Exception { 11 /**第一步,设计一个Topolgy*/ 12 TopologyBuilder builder = new TopologyBuilder(); 13 /* 14 * 设置spout和bolt,完整参数为spout的id(即name),spout对象,并发数(此项没有默认null) 15 */ 16 //setSpout 17 builder.setSpout("sentence-spout",new RandomSentenceSpout(),1); 18 //setBolt:SplitBolt的grouping策略是上层随机分发,CountBolt的grouping策略是按照上层字段分发 19 //如果想要从多个Bolt获取数据,可以继续设置grouping 20 builder.setBolt("split-bolt", new SplitBolt(),1) 21 .shuffleGrouping("sentence-spout"); 22 builder.setBolt("count-bolt", new CountBolt(),1) 23 .fieldsGrouping("split-bolt", new Fields("word")); 24 /**第二步,进行基本配置*/ 25 Config conf = new Config(); 26 //作用和影响??????????? 27 conf.setDebug(true); 28 if (args != null && args.length > 0) { 29 conf.setNumWorkers(1); 30 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 31 } 32 else { 33 /* 34 * run in local cluster, for test in eclipse. 35 */ 36 conf.setMaxTaskParallelism(3); 37 LocalCluster cluster = new LocalCluster(); 38 cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); 39 Thread.sleep(Integer.MAX_VALUE); 40 cluster.shutdown(); 41 } 42 } 43 }
2,SplitBolt类:接收上层tuple,进行split,分发给下一层
1 package act.chenkh.study.jstormPlay; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import org.apache.log4j.Logger; 7 8 import com.alibaba.jstorm.callback.AsyncLoopThread; 9 import com.alibaba.jstorm.callback.RunnableCallback; 10 11 import backtype.storm.task.TopologyContext; 12 import backtype.storm.topology.BasicOutputCollector; 13 import backtype.storm.topology.OutputFieldsDeclarer; 14 import backtype.storm.topology.base.BaseBasicBolt; 15 import backtype.storm.tuple.Fields; 16 import backtype.storm.tuple.Tuple; 17 import clojure.inspector__init; 18 19 public class CountBolt extends BaseBasicBolt { 20 Integer id; 21 String name; 22 Map<String, Integer> counters; 23 String component; 24 private static final Logger LOG = Logger.getLogger(CountBolt.class); 25 private AsyncLoopThread statThread; 26 /** 27 * On create 28 */ 29 @Override 30 public void prepare(Map stormConf, TopologyContext context) { 31 this.counters = new HashMap<String, Integer>(); 32 this.name = context.getThisComponentId(); 33 this.id = context.getThisTaskId(); 34 this.statThread = new AsyncLoopThread(new statRunnable()); 35 36 LOG.info(stormConf.get("abc")+"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); 37 component = context.getThisComponentId(); 38 } 39 40 public void declareOutputFields(OutputFieldsDeclarer declarer) { 41 declarer.declare(new Fields("word","count")); 42 // declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum")); 43 // LOG.info("set stream coord-"+component); 44 } 45 46 //接收消息之后被调用的方法 47 public void execute(Tuple input, BasicOutputCollector collector) { 48 // String str = input.getString(0); 49 String str = input.getStringByField("word"); 50 if(!counters.containsKey(str)){ 51 counters.put(str, 1); 52 }else{ 53 Integer c = counters.get(str) + 1; 54 counters.put(str, c); 55 } 56 } 57 class statRunnable extends RunnableCallback { 58 59 @Override 60 public void run() { 61 while(true){ 62 try { 63 Thread.sleep(10000); 64 } catch (InterruptedException e) { 65 66 } 67 LOG.info("\n-- Word Counter ["+name+"-"+id+"] --"); 68 for(Map.Entry<String, Integer> entry : counters.entrySet()){ 69 LOG.info(entry.getKey()+": "+entry.getValue()); 70 } 71 LOG.info(""); 72 } 73 74 } 75 } 76 77 }
3,CountBolt类:接收上层tuple,进行count,展示输出
1 package act.chenkh.study.jstormPlay; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import org.apache.log4j.Logger; 7 8 import com.alibaba.jstorm.callback.AsyncLoopThread; 9 import com.alibaba.jstorm.callback.RunnableCallback; 10 11 import backtype.storm.task.TopologyContext; 12 import backtype.storm.topology.BasicOutputCollector; 13 import backtype.storm.topology.OutputFieldsDeclarer; 14 import backtype.storm.topology.base.BaseBasicBolt; 15 import backtype.storm.tuple.Fields; 16 import backtype.storm.tuple.Tuple; 17 import clojure.inspector__init; 18 19 public class CountBolt extends BaseBasicBolt { 20 Integer id; 21 String name; 22 Map<String, Integer> counters; 23 String component; 24 private static final Logger LOG = Logger.getLogger(CountBolt.class); 25 private AsyncLoopThread statThread; 26 /** 27 * On create 28 */ 29 @Override 30 public void prepare(Map stormConf, TopologyContext context) { 31 this.counters = new HashMap<String, Integer>(); 32 this.name = context.getThisComponentId(); 33 this.id = context.getThisTaskId(); 34 this.statThread = new AsyncLoopThread(new statRunnable()); 35 36 LOG.info(stormConf.get("abc")+"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); 37 component = context.getThisComponentId(); 38 } 39 40 public void declareOutputFields(OutputFieldsDeclarer declarer) { 41 declarer.declare(new Fields("word","count")); 42 // declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum")); 43 // LOG.info("set stream coord-"+component); 44 } 45 46 //接收消息之后被调用的方法 47 public void execute(Tuple input, BasicOutputCollector collector) { 48 // String str = input.getString(0); 49 String str = input.getStringByField("word"); 50 if(!counters.containsKey(str)){ 51 counters.put(str, 1); 52 }else{ 53 Integer c = counters.get(str) + 1; 54 counters.put(str, c); 55 } 56 } 57 class statRunnable extends RunnableCallback { 58 59 @Override 60 public void run() { 61 while(true){ 62 try { 63 Thread.sleep(10000); 64 } catch (InterruptedException e) { 65 66 } 67 LOG.info("\n-- Word Counter ["+name+"-"+id+"] --"); 68 for(Map.Entry<String, Integer> entry : counters.entrySet()){ 69 LOG.info(entry.getKey()+": "+entry.getValue()); 70 } 71 LOG.info(""); 72 } 73 74 } 75 } 76 77 }
参考:http://fireinwind.iteye.com/blog/2153699(第一个Storm应用)
三、Grouping的几种方式
四、Bolt的声明周期
1、在定义Topology实例过程中,定义好Spout实例和Bolt实例
2、在提交Topology实例给Nimbus的过程中,会调用TopologyBuilder实例的createTopology()方法,以获取定义的Topology实例。在运行createTopology()方法的过程中,会去调用Spout和Bolt实例上的declareOutputFields()方法和getComponentConfiguration()方法,declareOutputFields()方法配置Spout和Bolt实例的输出,getComponentConfiguration()方法输出特定于Spout和Bolt实例的配置参数值对。Storm会将以上过程中得到的实例,输出配置和配置参数值对等数据序列化,然后传递给Nimbus。
3、在Worker Node上运行的thread,从Nimbus上复制序列化后得到的字节码文件,从中反序列化得到Spout和Bolt实例,实例的输出配置和实例的配置参数值对等数据,在thread中Spout和Bolt实例的declareOutputFields()和getComponentConfiguration()不会再运行。
4、在thread中,反序列化得到一个Bolt实例后,它会先运行Bolt实例的prepare()方法,在这个方法调用中,需要传入一个OutputCollector实例,后面使用该OutputCollector实例输出Tuple
5、接下来在该thread中按照配置数量建立task集合,然后在每个task中就会循环调用thread所持有Bolt实例的execute()方法
6、在关闭一个thread时,thread所持有的Bolt实例会调用cleanup()方法
不过如果是强制关闭,这个cleanup()方法有可能不会被调用到
五、Stream里面的Tuple
1 public List<Integer> emit(List<Object> tuple, Object messageId) { 2 return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); 3 }
这里的tuple, 实际上是List<Object> 对象,返回的是 List<Integer> 是要发送的tast的IdsList
在bolt接收的时候, 变成一个Tuple对象, 结构应该也是一个list, List<Field1, value1, Field2, value2..>这样的一个结构, FieldList ValueList, 我们根据对应的fieldname就可以取出对应的getIntegerByField方法
JStorm第一个程序WordCount详解