首页 > 代码库 > storm文档(9)----消息处理保证机制

storm文档(9)----消息处理保证机制

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41577125

源地址:http://storm.apache.org/documentation/Guaranteeing-message-processing.html

 

 

Storm保证:每条离开spout的消息都可以得到"fullyprocessed"。本文描述了storm如何实现这种保证以及你如何能够从Storm这种可靠性能力中受益。

 

 

"fully processed"对消息意味着什么?

 

离开spout的一个tuple可能触发创建成百上千个基于它的tuples。例如,考虑一下streaming word count topology:

 

TopologyBuilder builder = newTopologyBuilder();

builder.setSpout( "sentence", newKestrelSpout("kestrel.backtype.com",

                                                                                             22133,

                                                                                             "sentence_queue",

                                                                                             newStringScheme()

                                                                                             )

                             );

builder.setBolt("split", newSplitSentence(), 10 )

           .shufferGrouping("sententces");

builder.setBolt("count", newWordCount(), 20 )

           .fieldsGrouping("split", newFields("word"));

 

上面的topology从Kestrel queue读取句子,将这些句子划分成词组,然后按照前面划分词组时统计的每个词的次数发送每个词。离开spout的某个tuple可能会触发创建很多基于它的tuples:句子中每个单词都会对应一个tuple,同时每个单词的次数也会对应一个tuple。消息的树状结构如下所示:




当tuple树状图产生并且树状图中每条消息都被处理过,Storm就认为离开spout的tuple已经被“fully processed”(完全处理)。可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置

 

 

 

当消息完整处理或者处理失败的时候都发生了什么?

 

要理解这个问题,需要看一下tuple在离开spout之后的生命周期。作为参考,下面是spout实现的接口(更多接口信息需要查看Javadoc)。

 

public interface ISpout extendsSerializable {

         voidopen(Map conf, TopologyContext context,SpoutOutputCollector collector);

         voidclose();

         voidnextTuple();

         voidack(Object msgId);

         voidfail(Object msgId);

}

 

首先,Storm会通过Spout的nextTuple方法从Spout申请一个tuple。在open方法中,Spout使用此方法提供的SpoutOutputCollector去发射一个tuple到输出streams中去。当发射一个tuple时,Spout会提供一个“message id”,用来后面区分不同的tuple。例如, KestrelSpout从kestrel队列中读取消息,然后在发射时会将Kestrel为消息提供的id作为“message id”。发射一条消息到SpoutOutputCollector,如下所示:

_collector.emit(newValues("field1", "field2", 3), msgId);

 

然后,这个tuple会发送到消费bolts,同时Storm会跟踪已被创建的消息树状图。如果Storm检测到一个tuple已被“fully processed”, Storm将会原始的Spout task(即发射这个tuple的Spout)上调用ack方法,参数msgId就是这个Spout提供给Storm的“message id”。类似的,如果这个tuple超时了, Storm会在原始的Spout task上调用fail方法。注意, 一个tuple只能被创建它的Spouttask进行acked或者failed。因此,即使一个Spout在集群上正在执行很多tasks,一个tuple也只能被创建它的task进行acked或failed,而其他的task则不行。

 

再次使用KestrelSpout作为例子,看一下Spout是怎样保证消息处理的。当KestrleSpout从Kestrel 队列中拿出消息后,它将“opens”这个消息。这就意味着消息并不会真正被拿出队列,而是处于等待状态,即需要确认消息已被完整处理。当处于等待状态时,消息不会发送给队列的其他消费者。另外, 如果某个客户端与所有处于等待状态的消息(当然这些消息是提供给这个客户端的)断开连接,那这些消息会被放回到队列中。当消息一旦被打开, Kestrle会为客户端提供消息的数据内容以及一条唯一的id。 KestrelSpout在想SpoutOutputCollector发射tuple时,就需要使用上述id作为“message id”。稍后, 当KestrleSpout调用ack或者fail方法时,KestrelSpout会发送一条ack或者fail的消息给Kestrel,当然,消息内容需要使用上面提到的“message id”作为区别,等Kestrel接到这条消息后才能确定是将消息真正拿出队列还是将它放回去。

 

 

Storm 可靠性API是哪些?

 

作为用户,想要从Storm的可靠能力中受益,你需要做两件事情。首先, 你需要告诉Storm,在tuples的树状图中,何时创建的新连接。其次, 你需要告诉storm,何时完成的单个tuple的 处理。 做完这些事情, Storm就可以检测tuple的树状图是否已被“fullyprocessed”,然后才可以对相应的spout tuple调用ack或者fail方法。 Storm AP提供可以同时做这两件事的简洁方法。

 

指定tuple树状图中新连接的方法称为anchoring。 anchoring在你发射新tuple时可以完成。下面将以一个bolt作为例子,这个bolt会将包含一个句子的tuple划分成包含每个词的tuple。

public class SplitSentence extendsBaseRichBolt {

         OutputCollector_collector;

 

         publicvoid prepare(Map conf, TopologyContext context, OutputCollector collectors) {

                   _collector= collector;

         }

 

         publicvoid execute(Tuple tuple){

                   Stringsentence = tuple.getString(0);

                   for(Stringword:  sentence.split(" ")){

                            _collector.emit(tuple, new Values(word));

                   }

                   _collector.ack(tuple);

         }

         publicvoid declareOutputFields( OutputFieldDeclarer declarer){

                   declarer.declare(new Fields(“word”));

         }

}

 

每个单词tuple在调用emit方法时,通过指定输入tuple作为emit方法的第一个参数这种方式,这个单词tuple就被anchored。一旦单词tuple被anchord,如果此tuple在后面处理时failed,那么树状图中根部的spout tuple稍后就会重新进行一次完整的处理。作为对比,可以看一下当单词tuple如下发射时会发生什么:

 

_collector.emit(new Values(word));

 

通过这种方式发射单词tuple会使它被anchored。如果tuple在后面处理时failed,根部tuple就不会重新处理。这取决于在topology中所使用容错性保证机制,某些时候可能发射unanchored的tuple更加合适。

 

一个输出tuple可能会被anchored到多个输入tuple。这在处理流合并或者流聚合时非常有用。 multi-anchored的tuple如果处理失败,则会引起多个tuples在spouts重新处理。multi-anchoring可以通过指定一系列tuples实现,而不是仅仅一个单独的tuple。例如:

 

List<Tuple> anchors = newArrayList<Tuple>();

anchors.add(tuple1);

anchors.add(tuple2);

_collector.emit( anchors. new Values(1, 2,3));

 

multi-anchoring会将输出tuple加到多个tuple树状图中。注意,multi-anchoring也可能会打破树状图,并且创建tuple DAGs,就像下面

 

 

 

就像实现树状图一样(前面说的是只能实现树状图,即“tuple tress”主干),Storm的同样实现DAGs。

 

Anchoring就是如何制定tuple tree----下一个也是最后一个有关Storm可靠性API,就是指定合适完成tuple树中某个tuple的处理。 这通过调用OutputCollector类的ack和fail方法完成。 如果回看SplitSentence例子, 可以看到, 输入tuple是在所有wordtuple发射之后才能被acked。

 

可以使用OutputCollector类的fail方法直接fail掉tuple树中根部的spout tuple。例如, 可以应用于选择捕捉数据库客户端异常并且显式的fail 输入tuple。通过显式的fail的方式,spout tuple可以在tuple超时之前就重新处理。

 

每个你处理的tuple都必须acked或者failed。 Storm使用内存跟踪每个tuple,因此如果你没有ack/fail 每个tuple, task 最终会溢出。

 

大量的bolts采用通用方式读取输入tuple,并发射输入tuple,然后在execute方法末尾会ack这个tuple。这些bolts按类别可以分为过滤器和执行简单的功能。Storm有一个称为BasicBolt的接口,它用来封装这些方式。 SplitSentence的例子可以如下:

 

public class SplitSentence extends BaseBasicBolt{

         publicvoid execute (Tuple tuple, BasicOutputCollector collector){

                   Stringsentence = tuple.getString(0);

                   for( Stringword: sentence.split(" ")){

                            collector.emit( newValues(word));

                   }

         }

 

         publicvoid declareOutputFields( outputFieldsDeclarer declarer) {

                   declarer.declare(new Fields("word");

         }

}

 

这个实现要比前面的实现简单,并且语义上是相同的。发射给BasicOutputCollector 的tuple会自动anchored到输入tuple,同时当execute方法完成时,输入tuple将同时自动被acked。

 

相比之下,做聚合或者合并的bolts会延迟ack一个tuple,直到bolts已经从这个tuple所在tuples范围内获得聚合或者合并结果后,才能ack。聚合和合并将通常对它们输出的tuples使用multi-anchor的方式。上述操作均不属于IBasicBolt的简单方式。

 

 

怎样使这些应用(需要重新处理tuples)正常工作?

 

就像通常的软件设计,答案是“it depends”。 Storm 0.7.0 引入了“transactional topologies”特征,使得你在大多数计算下,可以获得一次具有容错性能的完整消息传递语义。 更多有关transactionaltopologies信息请阅读这里。

 

 

 

 

Storm以一种有效的方式实现可靠性?

 

Storm topology 有一系列特定的“acker”任务,它们可以跟踪每个spouttuple的DAG。当acker确认DAG完整处理了,它会给spout task发送消息,而这个spout task就是创建tuple并等待ack消息的task。你可以在topology配置选项中设置topology中acker tasks数目,具体配置选项为Config.TOPOLOGY_ACKERS.  Storm默认TOPOLOGY_ACKERS是1个task-----一旦你需要处理大量的数据,你需要提高这个值。

 

理解Storm可靠性具体实现的最好方式就是查看tuples以及tuples DAGs的生命周期。当topology中创建tuple时,无论是在spout还是bolt中,它都会获得一个随机的64 bit 的id。这些ids由ackers用来跟踪tuple DAG中每个spout tuple。

 

每个tuple都知道它所在tuple树中所有spout tuples的ids。当bolt发射新tuple时,来自tuple‘s anchors中的spout tuple ids就会拷贝到新tuple中去。当一个tuple被acked时,它会发送有关tuple变化信息给相应的acker tasks。特别是,它会告诉acker:在当前spout tuple树中,一个tuple已经处理完毕,同时输出树中新的tuple,当然这个新tuple 是anchored处理完毕的那个tuple。

 

例如, tuples “D”和“E”是以tuple “C”为基础创建的,这就是当“C”被acked的时候,树状图发生的变化。

 


 

当“C”从树中移除时,同一时刻,“D”和“E”会加到树中。树永远不能过早的结束。

 

还有一些细节需要说一下,即有关Storm如何追踪tuple树的。就像上面已经提到的,你在topology中可能有任意数量的acker tasks。这会引起下面的问题:当topology中一个tuple被acked时候,它是如何知道应该向那个acker task发送信息。

 

Storm使用模运算hash来映射spouttuple id和acker task。因为每个tuple都会携带它的spout tuple id(无论它在哪个树中,都是一样的),这样tuple就知道了应该和哪个acker tasks通信。

 

上面是讲了Storm 的acker tasks是如何跟踪每一个spout tuple的,即通过tuple id 和acker task之间的hash映射。

 

Storm的另外一个细节时: acker tasks是如何跟踪spouttasks的。当spout task发射一个新tuple时,它会发送一个简单的消息通知相应acker,并告诉它负责发送这个spout tuple的task id。然后,当一个acker发现某个树已经结束了,它就会知道是哪个task id发送的结束消息。

 

acker tasks不是显式的跟踪tuples的树。 对于大规模的tuple 树,例如包含成千上万个节点甚至更多的树, 追踪所有的tuple树会耗尽内存。ackers会采取不同策略: 每个spout tuple只需要一个固定数量的空间(大约20个字节)。这种追踪算法是Storm如何工作的关键并且也是其主要技术突破之一。

 

acker task中存放了一个spout tuple id和一对值的映射。 第一个值就是创建这个spouttuple的task id,它稍后会用来发送结束消息。第二值就是称为“ack val”的64bit数字。 ack val代表了整个tuple树的状态,无论多大还是多小。它是树中所有tuple ids的简单异或,包括已被才创建或者已被acked的tuple。

 

acker task当发现一个“ack val”已经变为0, 它就会知道tuple树已经结束了。 因为tuple ids都是64bit的随机数, “ack val”意外变成0的概率极其小。 如果使用数学计算一下就会发现, 每秒钟10k acks, 那么每5000万年才可能出现一次失误。 即使出现失误, 如果tuple在topology中遇到fail时也仅仅会引起数据丢失。

 

现在你已经知道了可靠性算法,下面浏览一下各种失败情况下Storm怎样避免数据丢失:

l  a tuple isn‘t acked because task died:即因task死掉而造成tuple不能acked; 这种情况下, 引起failed的tuple所在树的根节点的spouttuple ids会因超时而重新处理。

l  acker task dies:因acker task 死掉造成tuple不能acked;这种情况下, 这个acker task跟踪的所有spouttuples都会因超时而重新处理。

l  Spout task dies:因spout task死掉造成tuple不能acked;这种情况下,spout获取数据的数据源应该负责重新处理数据。例如, 向Kestrel和RabbiMQ的队列将在客户端失联的情况下重新把所有等待的数据放回队列中。

 

如你所看,Storm的可靠性机制是完全分布式的、可伸缩的、并且是容错的。

 

 

 

调优可靠性

 

Acker task是轻量级的,因此在topology中并不需要很多。 可以使用Storm UI(即组件id“__acker")跟踪他们的性能。如果吞吐量看起来不大对, 可能需要增加ackertasks。

 

如果可靠性对你而言不是那么重要----即,你不必担心失败情况下会损失tuples-------那么你可以放弃追踪spout tuples的树,从而改善性能。不追踪tuple树的话,可以减少一半的消息传递,因为通常没法送一个tuple就会发送一条ack消息;另外,不追踪tuple树的话,还可以减少每个下游tuple处理需要保存ids,并减少带宽消耗。

 

三种方式可以放弃使用可靠性:

 

 第一种方式是设置Config.TOPOLOGY_ACKER为0;这种情况下, Storm在spout发送一个tuple之后直接调用ack方法,这样一来,就不会最终tuple树了。

 

第二种方式是使用消息的基本设置。SpoutOutputCollector.emit方法会发射单个tuple,你可以通过这种方法的设置关闭对tuple的追踪。

 

第三种方式是,如果topology中tuples下游中某个特定子集处理失败,而你又不关心这个,那你可以将这些特定子集作为unanchored的tuples发送。因为他们没有anchored到任何spout tuples,如果这些tuples没有被acked,也就不会引起任何spout tuples作为失败处理的。

 

 

 

 

 

 

 

 

 

storm文档(9)----消息处理保证机制