首页 > 代码库 > storm学习笔记完整记录(一)

storm学习笔记完整记录(一)

storm有两种运行模式(本地模式和集群模式)

     1、 首先创建一个类似于HelloWorld的简单程序,以便进入storm的大门,包结构如下:

      wKiom1O0637T2s4_AADSE_0uU2g045.jpg

      2、从包结构可以知道,这是一个Maven Project,pom.xml的内容如下:

        

    <project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">

      <modelVersion>4.0.0</modelVersion>
  <groupId>storm.book</groupId>
  <artifactId>Getting-Started</artifactId>
  <version>0.0.1-SNAPSHOT</version>
 
  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
          <compilerVersion>1.7</compilerVersion>
        </configuration>
      </plugin>
  </plugins>
  </build>

  <repositories>
       
        <!-- Repository where we can found the storm dependencies  -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>

  </repositories>

  <dependencies>

        <!-- Storm Dependency -->
        <dependency>
          <groupId>storm</groupId>
          <artifactId>storm</artifactId>
          <version>0.7.1</version>
       </dependency>

  </dependencies>

</project>

 

WorderReader.java的代码如下:

    package spouts;

    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.util.Map;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;

    public class WordReader extends BaseRichSpout {

         private SpoutOutputCollector collector;
         private FileReader fileReader;
         private boolean completed = false;
         

         public void ack(Object msgId) {
              System.out.println("OK:"+msgId);
         }
 

         public void close() {

         }
 

        public void fail(Object msgId) {
              System.out.println("FAIL:"+msgId);
         }

     

        /**
         * The only thing that the methods will do It is emit each
         * file line
         */
         public void nextTuple() {
              /**
               * The nextuple it is called forever, so if we have been readed the file
               * we will wait and then return
               */
              if(completed){
                   try {
                            Thread.sleep(1000);
                   } catch (InterruptedException e) {
                        //Do nothing
                   }
                   return;
              }


              String str;
              //Open the reader
              BufferedReader reader = new BufferedReader(fileReader);
              try{
                   //Read all lines
                   while((str = reader.readLine()) != null){
                        /**
                         * By each line emmit a new value with the line as a their
                         */
                        this.collector.emit(new Values(str),str);
                   }
              }catch(Exception e){
                   throw new RuntimeException("Error reading tuple",e);
              }finally{
                   completed = true;
              }
         }

 

         /**
          * We will create the file and get the collector object
          */
         public void open(Map conf, TopologyContext context,
               SpoutOutputCollector collector) {
              try {
                   this.fileReader = new FileReader(conf.get("wordsFile").toString());
              } catch (FileNotFoundException e) {
                   throw new RuntimeException("Error reading file ["+conf.get("wordsFile")+"]");
              }
              this.collector = collector;
         }

 

 

        /**
          * Declare the output field "word"
          */
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("line"));
         }
    }

    WordNormalizer.java的代码如下:

    

    package bolts;

    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;

    public class WordNormalizer extends BaseBasicBolt {

         public void cleanup() {}

         /**
          * The bolt will receive the line from the
          * words file and process it to Normalize this line
          *
          * The normalize will be put the words in lower case
          * and split the line to get all words in this
          */
         public void execute(Tuple input, BasicOutputCollector collector) {
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for(String word : words){
                word = word.trim();
                if(!word.isEmpty()){
                    word = word.toLowerCase();
                    collector.emit(new Values(word));
                }
            }
         }
 

         /**
          * The bolt will only emit the field "word"
          */
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("word"));
         }
    }

 

    WordCounter.java的代码如下:
    

    package bolts;

    import java.util.HashMap;
    import java.util.Map;

    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;

    public class WordCounter extends BaseBasicBolt {

         Integer id;
         String name;
         Map<String, Integer> counters;

         /**
          * At the end of the spout (when the cluster is shutdown
          * We will show the word counters
          */
         @Override
         public void cleanup() {
              System.out.println("-- Word Counter ["+name+"-"+id+"] --");
              for(Map.Entry<String, Integer> entry : counters.entrySet()){
                   System.out.println(entry.getKey()+": "+entry.getValue());
              }
         }

        

         /**
          * On create
          */
         @Override
         public void prepare(Map stormConf, TopologyContext context) {
              this.counters = new HashMap<String, Integer>();
              this.name = context.getThisComponentId();
              this.id = context.getThisTaskId();
         }

         

         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {

         }


         @Override
         public void execute(Tuple input, BasicOutputCollector collector) {
                String str = input.getString(0);
                  /**
                   * If the word dosn‘t exist in the map we will create
                   * this, if not We will add 1
                   */
                  if(!counters.containsKey(str)){
                       counters.put(str, 1);
                  }else{
                       Integer c = counters.get(str) + 1;
                       counters.put(str, c);
                  }
         }
    }

 

    TopologyMain.java的代码如下:

    import spouts.WordReader;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import bolts.WordCounter;
    import bolts.WordNormalizer;

            
    public class TopologyMain {
         public static void main(String[] args) throws InterruptedException {
        
            //Topology definition
            TopologyBuilder builder = new TopologyBuilder();       
            builder.setSpout("word-reader",new WordReader());
            builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
            builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));
  
            //Configuration
            Config conf = new Config();
            conf.put("wordsFile", args[0]);
            conf.setDebug(false);
            //Topology run
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
            Thread.sleep(1000);
            cluster.shutdown();
        }
    }

    3、下载并安装Maven

           下载:http://mirrors.hust.edu.cn/apache/maven/maven-3/3.2.2/

            安装:解压apache-maven-3.2.2-bin.tar.gz

                        然后将apache-maven-3.2.2/bin添加到PATH环境变量里面

 

    4、使用Maven编译项目

            首先要进入到项目代码中pom.xml所在的目录,然后执行

            mvn  compile     命令编译项目代码

     5、运行项目代码

            mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/words.txt"

 

            运行结果如下:

            [root@Master storm-book]# mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/words.txt"
[INFO] Scanning for projects...
[INFO]                                                                        
[INFO] ------------------------------------------------------------------------
[INFO] Building Getting-Started 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.3.1:java (default-cli) @ Getting-Started ---
[WARNING] Warning: killAfter is now deprecated. Do you need it ? Please comment on MEXEC-6.
0    [TopologyMain.main()] INFO  backtype.storm.zookeeper  - Starting inprocess zookeeper at port 2181 and dir /tmp/dbe36a62-a375-40dc-a2a1-32cba2a06fe3
220  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Starting Nimbus with conf {"topology.fall.back.on.java.serialization" true, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "nimbus.monitor.freq.secs" 10, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "storm.local.dir" "/tmp/02c0aa1b-8f89-40db-9631-9e905cea9115", "supervisor.worker.start.timeout.secs" 240, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "supervisor.enable" true, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.worker.childopts" nil, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "task.heartbeat.frequency.secs" 3, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx1024m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "nimbus.task.timeout.secs" 30, "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "topology.ackers" 1, "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
285  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
323  [TopologyMain.main()-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none
429  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
470  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
474  [TopologyMain.main()-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none
477  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
477  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181] WARN  org.apache.zookeeper.server.NIOServerCnxn  - EndOfStreamException: Unable to read additional data from client sessionid 0x146fa8f71280002, likely client has closed socket
481  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
489  [TopologyMain.main()-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none
491  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
507  [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Starting Supervisor with conf {"topology.fall.back.on.java.serialization" true, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "nimbus.monitor.freq.secs" 10, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "storm.local.dir" "/tmp/71c1d996-8a85-44a5-a52a-0538f6318a7f", "supervisor.worker.start.timeout.secs" 240, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "supervisor.enable" true, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.worker.childopts" nil, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "task.heartbeat.frequency.secs" 3, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "supervisor.slots.ports" (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx1024m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "nimbus.task.timeout.secs" 30, "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "topology.ackers" 1, "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
509  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
513  [TopologyMain.main()-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none
515  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
554  [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Starting supervisor with id 0d0f0caf-511a-4458-b1ef-ee4f11399a8a at host Master
558  [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Starting Supervisor with conf {"topology.fall.back.on.java.serialization" true, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "nimbus.monitor.freq.secs" 10, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "storm.local.dir" "/tmp/451a5c40-29a7-4c9a-b075-17fe26205a36", "supervisor.worker.start.timeout.secs" 240, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "supervisor.enable" true, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.worker.childopts" nil, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "task.heartbeat.frequency.secs" 3, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "supervisor.slots.ports" (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx1024m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "nimbus.task.timeout.secs" 30, "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "topology.ackers" 1, "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
569  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
591  [TopologyMain.main()-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none
594  [TopologyMain.main()] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
624  [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Starting supervisor with id 0dc9916a-36a8-4796-92df-e965b53ae5b0 at host Master
706  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Received topology submission for Getting-Started-Toplogie with conf {"topology.ackers" 1, "topology.kryo.register" nil, "topology.name" "Getting-Started-Toplogie", "storm.id" "Getting-Started-Toplogie-1-1404363043", "wordsFile" "src/main/resources/words.txt", "topology.debug" false, "topology.max.spout.pending" 1}
820  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Task Getting-Started-Toplogie-1-1404363043:1 timed out
822  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Task Getting-Started-Toplogie-1-1404363043:2 timed out
823  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Task Getting-Started-Toplogie-1-1404363043:3 timed out
824  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Task Getting-Started-Toplogie-1-1404363043:4 timed out
831  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Reassigning Getting-Started-Toplogie-1-1404363043 to 1 slots
831  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Reassign ids: [1 2 3 4]

从这一行可以看出,分配了四个id,每个id对应一个task线程
831  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Available slots: (["0dc9916a-36a8-4796-92df-e965b53ae5b0" 4] ["0dc9916a-36a8-4796-92df-e965b53ae5b0" 5] ["0dc9916a-36a8-4796-92df-e965b53ae5b0" 6] ["0d0f0caf-511a-4458-b1ef-ee4f11399a8a" 1] ["0d0f0caf-511a-4458-b1ef-ee4f11399a8a" 2] ["0d0f0caf-511a-4458-b1ef-ee4f11399a8a" 3])
837  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Setting new assignment for storm id Getting-Started-Toplogie-1-1404363043: #:backtype.storm.daemon.common.Assignment{:master-code-dir "/tmp/02c0aa1b-8f89-40db-9631-9e905cea9115/nimbus/stormdist/Getting-Started-Toplogie-1-1404363043", :node->host {"0dc9916a-36a8-4796-92df-e965b53ae5b0" "Master"}, :task->node+port {4 ["0dc9916a-36a8-4796-92df-e965b53ae5b0" 4], 3 ["0dc9916a-36a8-4796-92df-e965b53ae5b0" 4], 2 ["0dc9916a-36a8-4796-92df-e965b53ae5b0" 4], 1 ["0dc9916a-36a8-4796-92df-e965b53ae5b0" 4]}, :task->start-time-secs {1 1404363043, 2 1404363043, 3 1404363043, 4 1404363043}}
863  [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Activating Getting-Started-Toplogie: Getting-Started-Toplogie-1-1404363043
871  [Thread-9] INFO  backtype.storm.daemon.supervisor  - Downloading code for storm id Getting-Started-Toplogie-1-1404363043 from /tmp/02c0aa1b-8f89-40db-9631-9e905cea9115/nimbus/stormdist/Getting-Started-Toplogie-1-1404363043
871  [Thread-4] INFO  backtype.storm.daemon.supervisor  - Downloading code for storm id Getting-Started-Toplogie-1-1404363043 from /tmp/02c0aa1b-8f89-40db-9631-9e905cea9115/nimbus/stormdist/Getting-Started-Toplogie-1-1404363043
892  [Thread-9] INFO  backtype.storm.daemon.supervisor  - Finished downloading code for storm id Getting-Started-Toplogie-1-1404363043 from /tmp/02c0aa1b-8f89-40db-9631-9e905cea9115/nimbus/stormdist/Getting-Started-Toplogie-1-1404363043
897  [Thread-4] INFO  backtype.storm.daemon.supervisor  - Finished downloading code for storm id Getting-Started-Toplogie-1-1404363043 from /tmp/02c0aa1b-8f89-40db-9631-9e905cea9115/nimbus/stormdist/Getting-Started-Toplogie-1-1404363043
929  [Thread-10] INFO  backtype.storm.daemon.supervisor  - Launching worker with assignment #:backtype.storm.daemon.supervisor.LocalAssignment{:storm-id "Getting-Started-Toplogie-1-1404363043", :task-ids (4 3 2 1)} for this supervisor 0dc9916a-36a8-4796-92df-e965b53ae5b0 on port 4 with id 4a5e8f63-d4c0-4dcd-9a02-95edcdd9f4fb
931  [Thread-10] INFO  backtype.storm.daemon.worker  - Launching worker for Getting-Started-Toplogie-1-1404363043 on 0dc9916a-36a8-4796-92df-e965b53ae5b0:4 with id 4a5e8f63-d4c0-4dcd-9a02-95edcdd9f4fb and conf {"topology.fall.back.on.java.serialization" true, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "nimbus.monitor.freq.secs" 10, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "storm.local.dir" "/tmp/451a5c40-29a7-4c9a-b075-17fe26205a36", "supervisor.worker.start.timeout.secs" 240, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "supervisor.enable" true, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.worker.childopts" nil, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "task.heartbeat.frequency.secs" 3, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "supervisor.slots.ports" (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx1024m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "nimbus.task.timeout.secs" 30, "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "topology.ackers" 1, "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
932  [Thread-10] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
936  [Thread-10-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none
938  [Thread-10] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
991  [Thread-10] INFO  backtype.storm.daemon.task  - Loading task word-reader:4
1022 [Thread-10] INFO  backtype.storm.daemon.task  - Opening spout word-reader:4
1022 [Thread-10] INFO  backtype.storm.daemon.task  - Opened spout word-reader:4
1028 [Thread-10] INFO  backtype.storm.daemon.task  - Finished loading task word-reader:4
1028 [Thread-18] INFO  backtype.storm.daemon.task  - Activating spout word-reader:4
1034 [Thread-10] INFO  backtype.storm.daemon.task  - Loading task word-normalizer:3
1057 [Thread-10] INFO  backtype.storm.daemon.task  - Preparing bolt word-normalizer:3
1057 [Thread-10] INFO  backtype.storm.daemon.task  - Prepared bolt word-normalizer:3
1070 [Thread-10] INFO  backtype.storm.daemon.task  - Finished loading task word-normalizer:3
1072 [Thread-10] INFO  backtype.storm.daemon.task  - Loading task word-counter:2
1081 [Thread-10] INFO  backtype.storm.daemon.task  - Preparing bolt word-counter:2
1081 [Thread-10] INFO  backtype.storm.daemon.task  - Prepared bolt word-counter:2
1085 [Thread-10] INFO  backtype.storm.daemon.task  - Finished loading task word-counter:2
1086 [Thread-10] INFO  backtype.storm.daemon.task  - Loading task __acker:1
1092 [Thread-10] INFO  backtype.storm.daemon.task  - Preparing bolt __acker:1
1099 [Thread-10] INFO  backtype.storm.daemon.task  - Prepared bolt __acker:1
1099 [Thread-10] INFO  backtype.storm.daemon.task  - Finished loading task __acker:1

四个线程分别为word-reader:4 /  word-nomalizer:3  / word-counter:2   / task_acker:1

其中除了word-reader:4这个spout线程有5种状态外,其它的每个bolt线程都有四种状态,

而且,这四个线程之间是顺序流的关系,一个线程准备结束,另一个线程接着运行
1116 [Thread-10] INFO  backtype.storm.daemon.worker  - Worker has topology config {"storm.id" "Getting-Started-Toplogie-1-1404363043", "topology.fall.back.on.java.serialization" true, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "nimbus.monitor.freq.secs" 10, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "storm.local.dir" "/tmp/451a5c40-29a7-4c9a-b075-17fe26205a36", "supervisor.worker.start.timeout.secs" 240, "wordsFile" "src/main/resources/words.txt", "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "supervisor.enable" true, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.name" "Getting-Started-Toplogie", "topology.worker.childopts" nil, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "task.heartbeat.frequency.secs" 3, "topology.max.spout.pending" 1, "storm.zookeeper.retry.interval" 1000, "supervisor.slots.ports" (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx1024m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "nimbus.task.timeout.secs" 30, "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "topology.ackers" 1, "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil}
1116 [Thread-10] INFO  backtype.storm.daemon.worker  - Worker 4a5e8f63-d4c0-4dcd-9a02-95edcdd9f4fb for storm Getting-Started-Toplogie-1-1404363043 on 0dc9916a-36a8-4796-92df-e965b53ae5b0:4 has finished loading
OK:storm
OK:test
OK:are
OK:great
OK:is
OK:an
OK:storm
OK:simple
OK:application
OK:but
OK:very
OK:powerfull
OK:really
OK:StOrm
OK:is
OK:great
OK:
1888 [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Shutting down master
1901 [TopologyMain.main()] INFO  backtype.storm.daemon.nimbus  - Shut down master
1907 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181] WARN  org.apache.zookeeper.server.NIOServerCnxn  - EndOfStreamException: Unable to read additional data from client sessionid 0x146fa8f71280003, likely client has closed socket
1925 [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Shutting down supervisor 0d0f0caf-511a-4458-b1ef-ee4f11399a8a
1926 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181] WARN  org.apache.zookeeper.server.NIOServerCnxn  - EndOfStreamException: Unable to read additional data from client sessionid 0x146fa8f71280005, likely client has closed socket
1927 [Thread-6] INFO  backtype.storm.util  - Async loop interrupted!
1928 [Thread-7] INFO  backtype.storm.util  - Async loop interrupted!
1930 [Thread-8] INFO  backtype.storm.util  - Async loop interrupted!
1931 [Thread-4] INFO  backtype.storm.event  - Event manager interrupted
1937 [Thread-5] INFO  backtype.storm.event  - Event manager interrupted
1940 [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Shutting down 0dc9916a-36a8-4796-92df-e965b53ae5b0:4a5e8f63-d4c0-4dcd-9a02-95edcdd9f4fb
1941 [TopologyMain.main()] INFO  backtype.storm.process-simulator  - Killing process 608ddd21-748c-4de7-afca-e072a705e582
1941 [TopologyMain.main()] INFO  backtype.storm.daemon.worker  - Shutting down worker Getting-Started-Toplogie-1-1404363043 0dc9916a-36a8-4796-92df-e965b53ae5b0 4
1943 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Toplogie-1-1404363043:4
1944 [Thread-16] INFO  backtype.storm.util  - Async loop interrupted!
1963 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Toplogie-1-1404363043:4
1963 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Toplogie-1-1404363043:3
1964 [Thread-20] INFO  backtype.storm.util  - Async loop interrupted!
1973 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Toplogie-1-1404363043:3
1973 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Toplogie-1-1404363043:2
1973 [Thread-22] INFO  backtype.storm.util  - Async loop interrupted!
-- Word Counter [word-counter-2] --
really: 1
but: 1
application: 1
is: 2
great: 2
are: 1
test: 1
simple: 1
an: 1
powerfull: 1
storm: 3
very: 1
1976 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Toplogie-1-1404363043:2
1976 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Toplogie-1-1404363043:1
1976 [Thread-24] INFO  backtype.storm.util  - Async loop interrupted!
1981 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Toplogie-1-1404363043:1
1981 [TopologyMain.main()] INFO  backtype.storm.daemon.worker  - Terminating zmq context
1981 [TopologyMain.main()] INFO  backtype.storm.daemon.worker  - Disconnecting from storm cluster state context
1981 [TopologyMain.main()] INFO  backtype.storm.daemon.worker  - Waiting for heartbeat thread to die
1981 [Thread-27] INFO  backtype.storm.util  - Async loop interrupted!
1981 [Thread-28] INFO  backtype.storm.util  - Async loop interrupted!
1981 [Thread-15] INFO  backtype.storm.util  - Async loop interrupted!
1982 [Thread-14] INFO  backtype.storm.event  - Event manager interrupted
1983 [TopologyMain.main()] INFO  backtype.storm.daemon.worker  - Shut down worker Getting-Started-Toplogie-1-1404363043 0dc9916a-36a8-4796-92df-e965b53ae5b0 4
1984 [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Shut down 0dc9916a-36a8-4796-92df-e965b53ae5b0:4a5e8f63-d4c0-4dcd-9a02-95edcdd9f4fb
1985 [TopologyMain.main()] INFO  backtype.storm.daemon.supervisor  - Shutting down supervisor 0dc9916a-36a8-4796-92df-e965b53ae5b0
1985 [Thread-11] INFO  backtype.storm.util  - Async loop interrupted!
1985 [Thread-12] INFO  backtype.storm.util  - Async loop interrupted!
1985 [Thread-13] INFO  backtype.storm.util  - Async loop interrupted!
1986 [Thread-9] INFO  backtype.storm.event  - Event manager interrupted
1986 [Thread-10] INFO  backtype.storm.event  - Event manager interrupted
1989 [TopologyMain.main()] INFO  backtype.storm.testing  - Shutting down in process zookeeper
1990 [TopologyMain.main()] INFO  backtype.storm.testing  - Done shutting down in process zookeeper
1990 [TopologyMain.main()] INFO  backtype.storm.testing  - Deleting temporary path /tmp/02c0aa1b-8f89-40db-9631-9e905cea9115
1990 [TopologyMain.main()] INFO  backtype.storm.testing  - Deleting temporary path /tmp/dbe36a62-a375-40dc-a2a1-32cba2a06fe3
1991 [TopologyMain.main()] INFO  backtype.storm.testing  - Deleting temporary path /tmp/71c1d996-8a85-44a5-a52a-0538f6318a7f
1991 [TopologyMain.main()] INFO  backtype.storm.testing  - Deleting temporary path /tmp/451a5c40-29a7-4c9a-b075-17fe26205a36
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.104 s
[INFO] Finished at: 2014-07-03T12:50:46+08:00
[INFO] Final Memory: 16M/38M
[INFO] ------------------------------------------------------------------------

 

 以上是开启一个wordcounter线程时的执行情况,如果将TopologyMain.java中的下面代码改一下:

builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));

改成

builder.setBolt("word-counter", new WordCounter(),2).shuffGrouping("word-normalizer");

再重新编译,然后运行,会发现运行结果中会多出一个task线程,结果如下:

893  [Thread-5] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
958  [Thread-5] INFO  backtype.storm.daemon.task  - Loading task word-reader:5
988  [Thread-5] INFO  backtype.storm.daemon.task  - Opening spout word-reader:5
988  [Thread-5] INFO  backtype.storm.daemon.task  - Opened spout word-reader:5
1002 [Thread-5] INFO  backtype.storm.daemon.task  - Finished loading task word-reader:5
1002 [Thread-18] INFO  backtype.storm.daemon.task  - Activating spout word-reader:5
1038 [Thread-5] INFO  backtype.storm.daemon.task  - Loading task word-normalizer:4
1047 [Thread-5] INFO  backtype.storm.daemon.task  - Preparing bolt word-normalizer:4
1047 [Thread-5] INFO  backtype.storm.daemon.task  - Prepared bolt word-normalizer:4
1048 [Thread-5] INFO  backtype.storm.daemon.task  - Finished loading task word-normalizer:4
1049 [Thread-5] INFO  backtype.storm.daemon.task  - Loading task word-counter:3
1064 [Thread-5] INFO  backtype.storm.daemon.task  - Preparing bolt word-counter:3
1064 [Thread-5] INFO  backtype.storm.daemon.task  - Prepared bolt word-counter:3
1071 [Thread-5] INFO  backtype.storm.daemon.task  - Finished loading task word-counter:3
1072 [Thread-5] INFO  backtype.storm.daemon.task  - Loading task word-counter:2
1089 [Thread-5] INFO  backtype.storm.daemon.task  - Preparing bolt word-counter:2
1089 [Thread-5] INFO  backtype.storm.daemon.task  - Prepared bolt word-counter:2
1091 [Thread-5] INFO  backtype.storm.daemon.task  - Finished loading task word-counter:2
1093 [Thread-5] INFO  backtype.storm.daemon.task  - Loading task __acker:1
1105 [Thread-5] INFO  backtype.storm.daemon.task  - Preparing bolt __acker:1
1109 [Thread-5] INFO  backtype.storm.daemon.task  - Prepared bolt __acker:1
1115 [Thread-5] INFO  backtype.storm.daemon.task  - Finished loading task __acker:1  

现在多了一个word-counter线程,然后在后面的具体统计时也有变化:

-- Word Counter [word-counter-3] --
really: 1
test: 1
simple: 1
an: 1
powerfull: 1
storm: 2
very: 1
1948 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Toplogie-1-1404363178:3
1949 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Toplogie-1-1404363178:2
1949 [Thread-26] INFO  backtype.storm.util  - Async loop interrupted!
-- Word Counter [word-counter-2] --
application: 1
but: 1
is: 2
great: 2
are: 1
storm: 1

可以看出,上面在最后统计时,使用的是两个wordcounter线程进行统计的,由于使用的是shuffleGrouping的分组形式,所以每个单词被当成一个单独的tuple被随意的扔给其中一个word-counter进程进行统计,所以storm被两个word-counter线程各统计了一部分。

 

如果希望相同的单词被同一个word-counter线程来统计,则需要使用filedsGrouping,代码如下:

builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));

修改后的运行结果如下:

-- Word Counter [word-counter-3] --
application: 1
is: 2
are: 1
test: 1
simple: 1
powerfull: 1
very: 1
2025 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shut down task Getting-Started-Toplogie-1-1404368225:3
2025 [TopologyMain.main()] INFO  backtype.storm.daemon.task  - Shutting down task Getting-Started-Toplogie-1-1404368225:2
2026 [Thread-26] INFO  backtype.storm.util  - Async loop interrupted!
-- Word Counter [word-counter-2] --
really: 1
but: 1
great: 2
an: 1
storm: 3

从上面的运行结果可以看出,现在相同的单词都被同一个word-counter线程进行统计了.

 

public class TopologyMain {
         public static void main(String[] args) throws InterruptedException {
        
            //Topology definition
            TopologyBuilder builder = new TopologyBuilder();       
            builder.setSpout("word-reader",new WordReader());
            builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
            builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));
  
            //Configuration
            Config conf = new Config();
            conf.put("wordsFile", args[0]);
            conf.setDebug(false);
            //Topology run
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
            Thread.sleep(1000);
            cluster.shutdown();
        }
    }

接下来具体讲解一下,Storm的执行流程:

首先,使用TopologyBuilder builder = new TopologyBuilder();   创建一个网络拓扑图对象。

然后,使用builder.setSpout("word-reader",new WordReader()); 向网络拓扑图中添加一个获取数据源的节点Spout。它负责从文件中读取文件内容,并将每一行作为了一个元组单元,发送给后面的bolt节点。

 然后,再用builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");

              向网络拓扑图中添加一个bolt节点,红色部分表示,通过shuffleGrouping方法从组件id为"word-reader"的节点中获取元组,作为本bolt节点的数据源。本bolt的作用是随机的从word-reader这个节点中接收一个元组(一行数据),然后将这一行数据切分成一个一个的单词。然后将每个单词再作为一个元组发送给后面的bolt进行统计。

 

然后,再用 builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word")); 再向网络拓扑图中添加一个bolt节点,用于从"word-normalizer"这个组件中接收元组,并按每个单词进行统计,fieldsGrouping("word-normalizer",new Fields("word"))表示,将相同的单词扔给同一个bolt进行统计。

   

本文出自 “涛哥的回忆” 博客,转载请与作者联系!