首页 > 代码库 > storm RollingTopWords 实时top-N计算任务窗口设计
storm RollingTopWords 实时top-N计算任务窗口设计
转发请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/6381037.html
流式计算中我们经常会遇到需要将数据根据时间窗口进行批量统计的场景,窗口性质一般由两个参数规定:1 Window length: 可以用时间或者数量来定义窗口大小;2 Sliding interval: 窗口滑动的间隔 。通过这两个参数一般把window分成滚动窗口和滑动窗口。
Sliding Window(滑动窗口)
Tuples are grouped in windows and window slides every sliding interval. A tuple can belong to more than one window.
For example a time duration based sliding window with length 10 secs and sliding interval of 5 seconds.
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
|<------- w1 -------->|
|------------ w2 ------->|
Tumbling Window(滚动窗口)
Tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows.
For example a time duration based tumbling window with length 5 secs.
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3
storm直到1.0.0版本后才官方加入了IWindowedBolt接口用来实现窗口计算,在此之前storm-starter里有一个稍微复杂点的RollingTopWords滑动窗口计算top N实现的demo。topology主要组件的流程设置如下:
(1)TestWordSpout负责产生单词源数据并通过fieldsGrouping发送到下游bolt
(2)RollingCountBolt负责统计Window length范围内的所有单词计数并每Sliding interval时间发送一次汇总信息到下游。
(3) IntermediateRankingsBolt,这是个中间bolt,主要是为了预先计算部分word的top-N排行榜出来,减少最终节点的排序工作。
(4)TotalRankingsBolt 最终top-N排序并输出计算结果。
1 String spoutId = "wordGenerator"; 2 String counterId = "counter"; 3 String intermediateRankerId = "intermediateRanker"; 4 String totalRankerId = "finalRanker"; 5 builder.setSpout(spoutId, new TestWordSpout(), 5); 6 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); 7 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields( 8 "obj")); 9 builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
RollingCountBolt初始化参数正好就是上面提到的windowLengthInSeconds和emitFrequencyInSeconds,new RollingCountBolt(300, 60)表示每分钟输出一下最近五分钟内的数据统计。
counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,this.emitFrequencyInSeconds));
RollingCountBolt内部存放了一个SlidingWindowCounter的结构,SlidingWindowCounter内部存储了SlotBasedCounter,SlotBasedCounter才是具体实现了怎样进行
窗口计算,滑动的窗口不停对应到一个环形的slot列表中。SlidingWindowCounter在窗口滑动的时候采取了如下动作:
public Map<T, Long> getCountsThenAdvanceWindow() { Map<T, Long> counts = objCounter.getCounts(); objCounter.wipeZeros(); objCounter.wipeSlot(tailSlot); advanceHead(); return counts; }
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口
advanceHead的实现, 如何在数组实现循环的滑动窗口
private void advanceHead() { headSlot = tailSlot; tailSlot = slotAfter(tailSlot); }
SlotBasedCounter主要用于按照窗口对应的slot进行incrementCount,getCounts和computeTotalCount,用于数据新增统计,全窗口数据提取和对应元素窗口内全部slot数据求和。
public void incrementCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { counts = new long[this.numSlots]; objToCounts.put(obj, counts); } counts[slot]++; } public Map<T, Long> getCounts() { Map<T, Long> result = new HashMap<T, Long>(); for (T obj : objToCounts.keySet()) { result.put(obj, computeTotalCount(obj)); } return result; } private long computeTotalCount(T obj) { long[] curr = objToCounts.get(obj); long total = 0; for (long l : curr) { total += l; } return total; }
如上所述, RollingCountBolt在没有窗口接口的情况下通过代码结构巧妙的实现了一个滑动窗口(理论上滚动窗口也一样可以实现),感觉还是很巧妙地。
参考资料:
1 Storm starter - RollingTopWords
storm RollingTopWords 实时top-N计算任务窗口设计