首页 > 代码库 > Storm的一些通用的Topology的模式

Storm的一些通用的Topology的模式

原文地址:http://storm.apache.org/documentation/Common-patterns.html

此文档有许明明的翻译:http://xumingming.sinaapp.com/189/twitter-storm-storm%E7%9A%84%E4%B8%80%E4%BA%9B%E5%B8%B8%E8%A7%81%E6%A8%A1%E5%BC%8F/

但最新的文档略有更新,所以这里重新进行整理:

本文列举了Storm Topology的一些通用的模式:

  1. 流式聚合
  2. 批处理
  3. BasicBolt
  4. 内存中的缓存 + fields grouping的组合
  5. 流式的top N计算
  6. 使用TimeCacheMap来高效保存最近更新的数据的缓存
  7. 分布式RPC:CoordinatedBolt和KeyedFairBolt

聚合(Joins)

流聚合基于一些共同的字段把两个或者多个数据流聚合在一起。然而一个普通的数据库聚合的输入是有限的,语意也很明确,但流聚合的输入是无限的,语意并不明确。

每种应用的聚合方式也不尽相同。一些应用始终都把两个流的tuple聚合在一起,一些应用只想根据特定的字段进行聚合,而其他的应用的聚合逻辑可能完全不一样。在所有的聚合类型中,有一种通用的模式,就是用相同的方式对多个输入流进行划分(partitioning)。在Storm中可以在一些字段上使用fields grouping,这样可以轻松的把多个输入流聚合到joiner bolt,例如:

builder.setBolt("join", new MyJoiner(), parallelism)
  .fieldsGrouping("1", new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping("2", new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping("3", new Fields("joinfield1", "joinfield2"));

当然,不同的数据流中的“相同”字段可以有不一样的名字。

批处理(Batching)

有时候为了性能或者一些别的原因,你想把一组tuple来个批处理,而不是一个个单独处理。例如,你可能想批量更新数据库或者以某种方式做一个流的聚合(aggregation)。

如果你想可靠的进行数据处理,正确的方式是保存这些tuple对象的引用,直到bolt批处理完成。一旦批处理完成,再对这些tuple做ack操作。

如果bolt发射(emit)tuple,那么你可能想使用multi-anchoring来确保可靠性。这个需要具体情况具体分析。参考Guaranteeing message processing来了解如何可靠工作的更多细节。

BasicBolt

多bolt遵从一种简单的模式:

  • 读取一个输入的tuple
  • 根据这个输入的tuple,发射0个或者多个tuple
  • 在execute方法的最后立即ack这个输入的tuple
遵从这种模式的bolt是函数(function)和过滤器(filter)。Storm为这种模式单独封装了一个接口:IBasicBolt。阅读Guaranteeing message processing获得更多信息。

内存中的缓存 + fields grouping组合(In-memory caching + fields grouping combo)

在Storm的bolt中保存一些缓存是很常见的。当你使用fields grouping来进行合并(combine)时,缓存变得特别有用。例如,假如你有一个bolt来把短链接转换成长链接(比如bit.ly,t.co之类)。你可以使用一个LRU缓存来维护短链接到长链接的映射关系来提高性能,防止同一个HTTP请求过于频繁。假设“urls”组件发射短链接,“expand”组件把短链接转换成长链接,并在内部维护一个缓存。看一下下面两段代码的不同之处:

builder.setBolt("expand", new ExpandUrl(), parallelism)
  .shuffleGrouping(1);
builder.setBolt("expand", new ExpandUrl(), parallelism)
  .fieldsGrouping("urls", new Fields("url"));

第2种方式使用缓存的效率比第1种要高得多,因为同样的URL始终会被发送到同一个task。这样会避免一个缓存会存在于多个task中,同时也能提高缓存的命中率(hit rate)。

流式Top N计算(Streaming top N)

Storm有一种常见的模式称为持续计算,Storm中的流式Top N计算正属于这个模式。假如你有一个bolt发射["value","count"]这种形式的tuple,并且有一个bolt来根据计数来发射top N的tuple。最简单的方式是有一个bolt在数据流上做一个global grouping,并维护一个top N的列表。

这种方式很明显对于数据量大的流没有扩展性,因为所有的流数据都会被发到同一个task。一种更好的方法是在多台机器上并行计算这个流中的一部分的top N,并且还有一个bolt来合并每一部分的top N中间计算结果,最终算出最后的top N(MR思想),这种模式看起来像是这样的:

builder.setBolt("rank", new RankObjects(), parallellism)
  .fieldsGrouping("objects", new Fields("value"));
builder.setBolt("merge", new MergeObjects())
  .globalGrouping("rank");

这种模式可以工作,是由于第一个bolt做了fields grouping使得这种并行算法的语意正确。

你可以在storm-starter项目中找到一个示例程序here。

使用TimeCacheMap来高效保存最近更新的数据的缓存(TimeCacheMap for efficiently keeping a cache of things that have been recently updated)

你有时候想在内存中缓存最近活跃的对象,并想让那些一段时间内不活跃的对象自动过期。TimeCacheMap是一个高效的数据结构,适用于这样的需求。并提供了钩子(hook),所以你可以添加回调函数,当一个对象过期会被自动调用。(关于TimeCacheMap为什么高效,可以看看这篇分析文章)

分布式RPC:CoordinatedBolt和KeyedFairBolt(CoordinatedBolt and KeyedFairBolt for Distributed RPC)

在Storm之上构建分布式RPC应用时,通常会使用2种通用的模式。它们被封装为CoordinatedBolt和KeyedFairBolt中,同时也是Storm的代码库中的标准库的一部分。

CoordinatedBolt 包装你的bolt,其中包含了你的逻辑,并且确定何时你的bolt收到某个特定的请求对应的所有请求。主要在direct stream中使用。

KeyedFairBolt 也是用于包装你的bolt,其中包含了你的逻辑,并且保证你的topology同时处理多个DRPC调用,而不是串行的一个个处理。

更多分布式RPC请参考Distributed RPC。

Storm的一些通用的Topology的模式