首页 > 代码库 > Storm的一些通用的Topology的模式
Storm的一些通用的Topology的模式
原文地址:http://storm.apache.org/documentation/Common-patterns.html
本文列举了Storm Topology的一些通用的模式:
- 流式聚合
- 批处理
- BasicBolt
- 内存中的缓存 + fields grouping的组合
- 流式的top N计算
- 使用TimeCacheMap来高效保存最近更新的数据的缓存
- 分布式RPC:CoordinatedBolt和KeyedFairBolt
聚合(Joins)
流聚合基于一些共同的字段把两个或者多个数据流聚合在一起。然而一个普通的数据库聚合的输入是有限的,语意也很明确,但流聚合的输入是无限的,语意并不明确。
每种应用的聚合方式也不尽相同。一些应用始终都把两个流的tuple聚合在一起,一些应用只想根据特定的字段进行聚合,而其他的应用的聚合逻辑可能完全不一样。在所有的聚合类型中,有一种通用的模式,就是用相同的方式对多个输入流进行划分(partitioning)。在Storm中可以在一些字段上使用fields grouping,这样可以轻松的把多个输入流聚合到joiner bolt,例如:
builder.setBolt("join", new MyJoiner(), parallelism)
.fieldsGrouping("1", new Fields("joinfield1", "joinfield2"))
.fieldsGrouping("2", new Fields("joinfield1", "joinfield2"))
.fieldsGrouping("3", new Fields("joinfield1", "joinfield2"));
当然,不同的数据流中的“相同”字段可以有不一样的名字。
批处理(Batching)
有时候为了性能或者一些别的原因,你想把一组tuple来个批处理,而不是一个个单独处理。例如,你可能想批量更新数据库或者以某种方式做一个流的聚合(aggregation)。
如果你想可靠的进行数据处理,正确的方式是保存这些tuple对象的引用,直到bolt批处理完成。一旦批处理完成,再对这些tuple做ack操作。
如果bolt发射(emit)tuple,那么你可能想使用multi-anchoring来确保可靠性。这个需要具体情况具体分析。参考Guaranteeing message processing来了解如何可靠工作的更多细节。
BasicBolt
许多bolt遵从一种简单的模式:
- 读取一个输入的tuple
- 根据这个输入的tuple,发射0个或者多个tuple
- 在execute方法的最后立即ack这个输入的tuple
内存中的缓存 + fields grouping组合(In-memory caching + fields grouping combo)
在Storm的bolt中保存一些缓存是很常见的。当你使用fields grouping来进行合并(combine)时,缓存变得特别有用。例如,假如你有一个bolt来把短链接转换成长链接(比如bit.ly,t.co之类)。你可以使用一个LRU缓存来维护短链接到长链接的映射关系来提高性能,防止同一个HTTP请求过于频繁。假设“urls”组件发射短链接,“expand”组件把短链接转换成长链接,并在内部维护一个缓存。看一下下面两段代码的不同之处:
builder.setBolt("expand", new ExpandUrl(), parallelism)
.shuffleGrouping(1);
builder.setBolt("expand", new ExpandUrl(), parallelism)
.fieldsGrouping("urls", new Fields("url"));
第2种方式使用缓存的效率比第1种要高得多,因为同样的URL始终会被发送到同一个task。这样会避免一个缓存会存在于多个task中,同时也能提高缓存的命中率(hit rate)。
流式Top N计算(Streaming top N)
Storm有一种常见的模式称为持续计算,Storm中的流式Top N计算正属于这个模式。假如你有一个bolt发射["value","count"]这种形式的tuple,并且有一个bolt来根据计数来发射top N的tuple。最简单的方式是有一个bolt在数据流上做一个global grouping,并维护一个top N的列表。
这种方式很明显对于数据量大的流没有扩展性,因为所有的流数据都会被发到同一个task。一种更好的方法是在多台机器上并行计算这个流中的一部分的top N,并且还有一个bolt来合并每一部分的top N中间计算结果,最终算出最后的top N(MR思想),这种模式看起来像是这样的:
builder.setBolt("rank", new RankObjects(), parallellism)
.fieldsGrouping("objects", new Fields("value"));
builder.setBolt("merge", new MergeObjects())
.globalGrouping("rank");
这种模式可以工作,是由于第一个bolt做了fields grouping使得这种并行算法的语意正确。
你可以在storm-starter项目中找到一个示例程序here。
使用TimeCacheMap来高效保存最近更新的数据的缓存(TimeCacheMap for efficiently keeping a cache of things that have been recently updated)
你有时候想在内存中缓存最近活跃的对象,并想让那些一段时间内不活跃的对象自动过期。TimeCacheMap是一个高效的数据结构,适用于这样的需求。并提供了钩子(hook),所以你可以添加回调函数,当一个对象过期会被自动调用。(关于TimeCacheMap为什么高效,可以看看这篇分析文章)
分布式RPC:CoordinatedBolt和KeyedFairBolt(CoordinatedBolt and KeyedFairBolt for Distributed RPC)
在Storm之上构建分布式RPC应用时,通常会使用2种通用的模式。它们被封装为CoordinatedBolt和KeyedFairBolt中,同时也是Storm的代码库中的标准库的一部分。
CoordinatedBolt
包装你的bolt,其中包含了你的逻辑,并且确定何时你的bolt收到某个特定的请求对应的所有请求。主要在direct stream中使用。
KeyedFairBolt
也是用于包装你的bolt,其中包含了你的逻辑,并且保证你的topology同时处理多个DRPC调用,而不是串行的一个个处理。
更多分布式RPC请参考Distributed RPC。
Storm的一些通用的Topology的模式