首页 > 代码库 > storm starter学习(二) - 流聚合

storm starter学习(二) - 流聚合

    SingleJoinExample示例说明了storm中流聚合的应用,将具有相同tuple属性的数据流整合成一个新的数据流。来看一下Topology。先定义两个数据源genderSpout和ageSpout,Fields分别为("id", "gender")、("id", "age"),最终聚合后的数据流按id进行分组,输出为("gender", "age")。具体Topology如下:


// 定义数据源
FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("gender", genderSpout);
builder.setSpout("age", ageSpout);
builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
        .fieldsGrouping("age", new Fields("id")); //数据流聚合
    拓扑中流聚合的主要功能在SingleJoinBolt下,先来看一下prepare方法。



public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    _fieldLocations = new HashMap<String, GlobalStreamId>();
    _collector = collector;
    int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
    _numSources = context.getThisSources().size();
    Set<String> idFields = null;
    for (GlobalStreamId source : context.getThisSources().keySet()) {
      Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
      Set<String> setFields = new HashSet<String>(fields.toList());
      if (idFields == null)
        idFields = setFields;
      else
        idFields.retainAll(setFields);

      for (String outfield : _outFields) {
        for (String sourcefield : fields) {
          if (outfield.equals(sourcefield)) {
            _fieldLocations.put(outfield, source);
          }
        }
      }
    }
    _idFields = new Fields(new ArrayList<String>(idFields));

    if (_fieldLocations.size() != _outFields.size()) {
      throw new RuntimeException("Cannot find all outfields among sources");
    }
  }
    首先在处理开始的地方,使用了TimeCacheMap。使用它的目的是,由于bolt在接收两个数据源的流数据时,同一id的两个数据流很可能不会在一个时间点内同时发出,因此需要对数据流先进行缓存,直到所有id相同的数据源都收到后再进行聚合处理,做完聚合处理后再将对应的tuple信息从缓存中删除。在处理过程中,有可能会出现某些id的tuple丢失,导致缓存中对应该id的其他tuples一直缓存在内存中,解决此问题的方法是设置timeout时间,定期清理过期tuples发送fail信息给spout。超时时间的大小设置需要结合具体应用的进行判断,尽量保证相同id的tuples会在较短的时间间隔内发送给bolt,避免重复timeout事件的放生。


    TimeCacheMap中ExpireCallback方法如下:


private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
    @Override
    public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
      for (Tuple tuple : tuples.values()) {
        _collector.fail(tuple);
      }
    }
}
    接下来在prepare中遍历TopologyContext中不同数据源,得到所有数据源(genderSpout和ageSpout),使用retainAll方法提取Set中公共的Filed信息,保存到变量_idFields中(id),将_outFileds中字段所在数据源记录下来,保存到_fieldLocations,以便在聚合时获取对应的字段值。


    excute方法是执行最后的流聚合功能,代码如下:


public void execute(Tuple tuple) {
    List<Object> id = tuple.select(_idFields);
    GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
    if (!_pending.containsKey(id)) {
      _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
    }
    Map<GlobalStreamId, Tuple> parts = _pending.get(id);
    if (parts.containsKey(streamId))
      throw new RuntimeException("Received same side of single join twice");
    parts.put(streamId, tuple);
    if (parts.size() == _numSources) {
      _pending.remove(id);
      List<Object> joinResult = new ArrayList<Object>();
      for (String outField : _outFields) {
        GlobalStreamId loc = _fieldLocations.get(outField);
        joinResult.add(parts.get(loc).getValueByField(outField));
      }
      _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);

      for (Tuple part : parts.values()) {
        _collector.ack(part);
      }
    }
}
    从tuple中取出id Fields信息和GlobalStreamId,判断当前id是否在_pending中存在,如不存在将当前数据放入到 _pending中。然后根据id来获取parts中对应的信息,如存在相同流id信息时,抛出异常:接收到来自同一Spout中id一致的tuple信息。不存在则放入到parts里。


    如果parts已经包含了聚合数据源的个数_numSources时,本例中为2,表示相同id从genderSpout和ageSpout中发出的tuple都已经收到,可以进行聚合处理。从_pending队列中移除这条记录,然后开始构造聚合后的结果字段:依次遍历_outFields中各个字段,从_fieldLocations中取到这些outFiled字段对应的GlobalStreamId,紧接着从parts中取出GlobalStreamId对应的outFiled,放入聚合后的结果中。ack所有tuple。否则,继续等待两spout的流数据,直到缓存的数据源个数达到聚合要求。

    最后,声明聚合后的输出字段,见declareOutputFields:


public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(_outFields);
}
    总结:


    在此示例中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。