首页 > 代码库 > Storm/JStorm之TopologyBuilder源码阅读

Storm/JStorm之TopologyBuilder源码阅读

在Strom/JStorm中有一个类是特别重要的,主要用来构建Topology的,这个类就是TopologyBuilder. 
咱先看一下简单的例子:

public static void main(String[] args) throws AlreadyAliveException,
            InvalidTopologyException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("input", new RandomSentenceSpout(), 2);
        builder.setBolt("bolt_sentence", new SplitSentenceBolt(), 2)
                .shuffleGrouping("input");

        // 本地模式:最主要用来调试用
        LocalCluster cluster = new LocalCluster();
        System.out.println("start wordcount");
        cluster.submitTopology("word count", conf,    builder.createTopology());
    }

在上面的main方法里先创建TopologyBuilder对象,然后设置好已创建的Spout节点和Bolt节点,并用随机分组(shuffleGrouping)将Spout和Bolt节点连接起来形成Topology。

那TopologyBuilder是如何做的呢?请看下面TopologyBuilder源代码:

/**
 * TopologyBuilder是一个用于构建Topology的工具类
 *
 */
public class TopologyBuilder {
    /**
     * 定义了类成员变量_bolts,用来存放IRichBolt类型的所有Bolt对象
     */
    private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
    /**
     * 定义了类成员变量_spouts,用来存放IRichSpout类型的所有Spout对象
     */
    private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
    /**
     * 定义了类成员变量_commons,存放了所有的Bolt和Spout对象
     */
    private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();

    // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();

    private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
    /**
     * 根据传入的Bolt和Spout对象构建StormTopology对象
     * @return
     */
public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
        for (String boltId : _bolts.keySet()) {
            //根据boltId从_bolts中获取到对应的bolt对象
            IRichBolt bolt = _bolts.get(boltId);
            //设置对应ComponentCommon对象的streams(输出的字段列表以及是否是直接流)属性值
            ComponentCommon common = getComponentCommon(boltId, bolt);
            /**
             * 先将Bolts对象序列化得到数组,再创建Bolt对象,所以所有在StormTopology中Bolts是对象序列化过后得到的字节数组.
             */
            boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
        }
        for (String spoutId : _spouts.keySet()) {
            //根据spoutId从_spouts中获取到对应的spout对象
            IRichSpout spout = _spouts.get(spoutId);
            //设置对应ComponentCommon对象的streams(输出的字段列表以及是否是直接流)
            ComponentCommon common = getComponentCommon(spoutId, spout);
            /**
             * 先将Spout对象序列化得到数组,再创建SpoutSpec对象,所以所有在StormTopology中Spouts是对象序列化过后得到的字节数组.
             */
            spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));

        }
        //将上述所设置的所有组件都封装到StormTopology对象中,最后提交到集群中运行
        return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
    }
    /**
     * 下面几个方法定义了setBolt方法以及它的重载方法
     */
    /**
     * 在这个topology中定义一个只有单线程并行度的新的bolt
     * 其它想要消耗这个bolt的输出的组件会引用这个id
     */
   public BoltDeclarer setBolt(String id, IRichBolt bolt) {
        return setBolt(id, bolt, null);
    }

    /**
     *  为这个topology定义一个指定数量的并行度的bolt
     */
    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
        //检测传入的组件id是否唯一
        validateUnusedId(id);
        //生成common对象
        initCommon(id, bolt, parallelism_hint);
        _bolts.put(id, bolt);
        return new BoltGetter(id);
    }


    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
        return setBolt(id, bolt, null);
    }


    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
        /**
         * 该方法利用BasicBoltExecutor包装(封装)传入的IBasicBolt对象
         * 在BasicBoltExecutor中实现了对消息的追踪
         */
        return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
    }
  /**
     * 下面几个方法定义了setSpout方法以及它的重载方法
     */
    public SpoutDeclarer setSpout(String id, IRichSpout spout) {
        return setSpout(id, spout, null);
    }

    public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
        //检测输入的id是否唯一,若已经存在将抛出异常
        validateUnusedId(id);
        /**
         * 构建ComponentCommon对象并进行相对应的初始化,最后放入到_commons(在上述中已经定义)
         */
        initCommon(id, spout, parallelism_hint);
        _spouts.put(id, spout);
        return new SpoutGetter(id);
    }


    public SpoutDeclarer setSpout(String id, IControlSpout spout) {
        return setSpout(id, spout, null);
    }

    public SpoutDeclarer setSpout(String id, IControlSpout spout, Number parallelism_hint) {
        return setSpout(id, new ControlSpoutExecutor(spout), parallelism_hint);
    }

    public BoltDeclarer setBolt(String id, IControlBolt bolt, Number parallelism_hint) {
        return setBolt(id, new ControlBoltExecutor(bolt), parallelism_hint);
    }
    public BoltDeclarer setBolt(String id, IControlBolt bolt) {
        return setBolt(id, bolt, null);
    }

    public void setStateSpout(String id, IRichStateSpout stateSpout) {
        setStateSpout(id, stateSpout, null);
    }
    public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) {
        validateUnusedId(id);
        // TODO: finish
    }
    /**
     * 检测输入的id是否唯一
     * @param id
     */
    private void validateUnusedId(String id) {
        if (_bolts.containsKey(id)) {
            throw new IllegalArgumentException("Bolt has already been declared for id " + id);
        }
        if (_spouts.containsKey(id)) {
            throw new IllegalArgumentException("Spout has already been declared for id " + id);
        }
        if (_stateSpouts.containsKey(id)) {
            throw new IllegalArgumentException("State spout has already been declared for id " + id);
        }
    }

    private ComponentCommon getComponentCommon(String id, IComponent component) {
        ComponentCommon ret = new ComponentCommon(_commons.get(id));

        OutputFieldsGetter getter = new OutputFieldsGetter();
        component.declareOutputFields(getter);
        ret.set_streams(getter.getFieldsDeclaration());
        return ret;
    }
    /**
     * 定义了initCommon方法,用来初始化变量CommonentCommon对象,并给类成员变量_commons赋值
     * 初始化所做的工作:设置并行度还有一些其它配置
     * @param id
     * @param component
     * @param parallelism
     */
private void initCommon(String id, IComponent component, Number parallelism) {
        ComponentCommon common = new ComponentCommon();
        //设置消息流的来源及分组方式
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
        if (parallelism != null) {
            //设置并行度
            common.set_parallelism_hint(parallelism.intValue());
        } else {
            //如果并行度没有手动设置则默认为1
            common.set_parallelism_hint(1);
        }
        Map conf = component.getComponentConfiguration();
        if (conf != null)
            //设置组件的配置参数
            common.set_json_conf(JSONValue.toJSONString(conf));
        _commons.put(id, common);
    }
}

从上面TopologyBuilder的类中可以看到这个类提供了创建StormTopology的方法以及一些数据源节点和处理节点的相关设置的方法,

还有就是存储Bolt对象和Spout对象的方法,当然这里关于分组的代码没有写出来。事实上这个类就是用来设置Spout节点和Bolt节点,

并通过分组方式将Spout和Bolt节点连接起来形成拓扑结构的。

Storm/JStorm之TopologyBuilder源码阅读