首页 > 代码库 > Storm 入门教程

Storm 入门教程

在这个教程中,你将学会如何创建 Storm 的topology并将他们部署到 Storm 集群上, 主要的语言是 Java,但是少数几个例子用 Python 编写来说明 Storm 的多语言支持能力。

术语和名词

MapReduce jobs
topologies topology 由用户编写的Storm集群中的业务处理逻辑
deamon 守护进程
worker process 工作进程
stream 流 指Storm中的数据流
tuple 元组 指stream中的最小单元数据
primitive 基件 指storm topology 的组成部分,比如 bolt 和 spout
task 任务

Storm 集群里的各种组件

从表面上看一个 Storm 集群 与 一个 Hadoop 集群相似,然而在 Hadoop 上运行 “MapReduce jobs”, 在 Storm 上运行 “topologies”, 但是 “jobs” 和 “topologies” 是非常不同的– 一个关键的不同是 MapReduce job 最终会结束,而一个 topology 是永远在等待消息并处理(直到你杀掉它)。

一个 Storm 集群中有两种节点(node):主节点和工作节点,主节点运行一个叫 “Nimbus” 的守护进程(daemon)跟 Hadoop 的 “任务跟踪器”(Jobtracker)类似。Nimbus 负责向集群中分发代码, 向各机器分配任务,以及监测故障。

这里的节点是指Storm集群中不同角色的服务器节点

每一个工作节点运行一个名叫 “Supervisor” 的守护进程。Supervisor 监听 Nimbus 指派到这个这台机器的任务,根据 Numbus 的指派信息来停启工作进程(worker process) ,每一个 worker process 执行一个topology的子集,一个运行中的topology由跨越多个主机的多个 worker process 组成。

技术分享

在 Nimbus 和 Supervisors 之间的所有协调调度通过一个 Zookeeper 集群来完成。另外,Nimbus 守护进程和 Supervisor 守护进程都是快速失败 (fail-fast)和无状态的;所有的状态保存在 Zookeeper 或者本地磁盘中。这意味着你可以 kill -9 Nimbus 或者 Supervisors 他们会自动恢复,就像什么都没发生过一样。这种设计让 Storm 集群变的不可思议的稳定。

Topologies

在Strom上做实时计算, 你需要创建 “Topology”,一个 topology 是一个计算过程的描述,一个 topology 中的每一个节点包含处理逻辑,节点之间的连接表明了数据在节点之间是如何传递的。

这里的节点是指 topology 中计算过程的每一个步骤

运行一个 是很简单的。首先,你将你所有的代码和依赖都打包到一个单独的jar包中,然后运行像下面这样的命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这样会传递arg1和 arg2参数给backtype.storm.MyTopology类,这个类的 main 方法定义topology 并将它提交到 Nimbus。Strom jar 部分负责连接 Nimbus 并上传jar包.

由于 topology 的定义本来就是 Thrift 结构,并且 Nimbus 是一个 Thrift 服务, 所以你可以使用任何编程语言来创建和提交 topology。上面的方法是使用基于 JVM 的编程语言来完成的最简单的方法,参考Running topologies on a production cluster 来获取更多的关于开启和停止 topology 的方法。

Streams

Strom 的核心抽象概念是 “流” (stream),一个 stream 相当于一个无限的元组(tuple) 序列,Storm 提供了以可靠且分布式的方法来将一个 stream 转换成一个新 stream 的基件 (primitive) ,例如你可能想将一个微博的 stream 来转成一个热门话题的 stream。

Storm提供基本的用来做流转换的的基件是 “spout” 和 “bolts” ,spout 和 bolt 提供了接口,你可以实现这些接口来处理的你自己的应用程序相关的逻辑。

spout 是流的来源, 例如 spout 可以从一个 Kestrel 队列来读 tuple 并且发射(emit)他们形成一个流,或者 spout 可以连接到 Twitter api,来发射一个推文的流。

一个 bolt 消费任意数量的流, 做一些处理,然后可能会发射出新的流,复杂的流转换,例如从一个推文的流计算出一个热门话题的流,需要多个步骤,多个 bolt 。bolt可以通过运行函数(functions)来做任何事,例如过滤元组,做流聚合,做流连接,跟数据库交互等等。

所有的 spout 和 bolt 被打包到了一个 “topology” 中 ,topology 是你提交给 Storm 集群来执行的计算过程的最高抽象,一个 topology 类似一个流转换的图表,它现显示了哪些 bolt 是绑定(subscribe)哪些 stream 上的 。当一个 spout 或者 bolt 发射出一个 tuple 到 stream 中,它会发送 tuple 到所有绑定了这个 stream 的 bolt 中。

技术分享

topology 中节点之间的连接表明了 tuple 是如何在他们之间传递的。例如如果在 spout A 和 bolt B 之间有一个连接,从 spout A 到 bolt C 之间有一个连接,从 boltB 到 boltC 有一个连接,tuple 会发到 bolt B 和 bolt C 中, 所有 bolt B 的输出 tuple 也会流到 bolt C 中

这里的节点是指 topology 中的 spout 或者 bolt 
topology中的每一个节点都是并行执行的。在你的topology中,你可以指定每个节点的并行数量n,然后 Storm会启动 n 个线程在集群中运行

一个 topology 是永远运行的,直到你杀掉它,Storm 会自动重新分配失败的任务。另外,Storm 保证没有数据丢失, 即使主机挂掉消息丢失。

数据模型

Storm 使用 tuple 做数据模型,一个 tuple 是被命名过的值列表,一个 tuple 中的字段可以是任何类型的对象。它是开箱即用的,Storm 支持所有的简单数据类型,如字符串,字节数组作为 tuple 的字段值。如果要使用另一种类型的对象,你只需要为这个类型实现一个 serializer

topology 中的每一个节点都应该为它要发射的元组声明输出字段, 例如, 下面这个bolt声明了它发射字段为 “double” 和 “triple” 字段的元组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}

@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}

declareOutputFields 方法声明了输出字段为["double", "triple"],
这个 bolt 类的其他部分将在下面的章节中讲解。

一个简单的topology

让我们来看一个简单的 topology 来探索更多的概念,看代码是如何构造起来的。我们从 storm-starter 项目里看看 ExclamationTopology 是如何定义的

1
2
3
4
5
6
TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");

这个 topology 包含一个 spout 和两个 bolt,spout 发送单词,每一个 bolt 附加 “!!!” 到它的输入数据中。这些节点排练成一条线:spout 先发射 tuple 到第一个 bolt,然后第一个 bolt 发送到第二个 bolt。如果 spout 发送 [“bob”] 和 [“john”] 元组,然后第二个bolt会发送 [“bob!!!!!!”] 和 [“john!!!!!!”] 元组

代码中使用 setSpout 和 setBolt 方法来定义节点.这些方法需要传入一个用户指定的id,一个包含处理逻辑的对象,以及你希望这个节点运行的并行数量。在这个例子中,spout 被指定了id “words”, bolt 被指定了id “exclaim1” 和 “exclaim2”

传入的 Spout 对象实现了 IRichSpout 接口并包含业务逻辑

传入的 Bolt 对象实现了 IRichBolt 接口并包含业务逻辑

最后一个参数,你想要这个节点的并行数量是几,这个参数是可选的,它表明有多少线程应该在集群中运行该 组件 ,如果你忽略了它,Storm 会给这个节点只分配一个线程

这里的组件是指被实例化后的节点,即 spout 或者 bolt

setBolt 返回一个 InputDeclarer 对象用来给 bolt 定义输入。这 “exclaim1”组件 声明了它要想读入所有 “words” 组件的发射的打乱分组过的所有 tuple.

“exclaim2” 组件声明了它要读入所有 “exclaim1” 发射的打乱分组过的 tuple,”打乱分组”(shuffile group)意味着 tuple 必须从输入中随机分发到 bolt 的任务中。有许多在组件之间将数据分组的方法,打乱只是其中一种。接下来的一些小节会解释到它。

如果你希望 “exclaim2” 组件,既读取 “words” 又读取 “exclaim1” 发射的 tuple , 你可以像如下这样实现 “excliam2” :

1
2
3
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");

正如你所见,可以给 bolt 链式的指定多个数据源。

让我们深入到这个 topology 中 spouts 和 bolts 的具体实现上。Spouts 负责发射新的消息到 topology中, 在这个 topology 中 TestWordSpouts方法 从 [“nathan”, “mike”, “jackson”, “golda”, “bertels”] 中每 100毫秒 发射一个随机的字符, TestWordSpout 中 nextTuple()方法 的实现是这样的:

1
2
3
4
5
6
7
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}

如你所见,这种实现非常的简单。

ExclamationBolt 附加 “!!!” 到输入中, 让我们看看 ExclamationBolt 的完整实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}

public void cleanup() {
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

public Map getComponentConfiguration() {
return null;
}
}

prepare 方法给 bolt 提供了一个 OutputCollector 对象用来从这个 bolt 中发射 tuple 。 在这个 bolt 中的任何位置都可以发射 tuples – prepareexecutecleanup 方法, 甚至在异步的其他线程中。prepare 方法仅仅保持一个 OutputCollector 对象实例以便在后面 execute 方法中调用。

execute 方法从输入中接收一个 tuple。ExclamationBolt 从元组中取到第一个字段,然后在后面附加 “!!!” 。 如果你实现的 bolt 订阅了多个输入源, 你可以使用 Tuple#getSourceComponent 方法查到当前的 tuple 是来自哪个组件.

execute 方法里还可以做一些其他操作,即将输入的 tuple 作为 emit 的第一个参数传入,这样这个 tuple 会被确认。这是 Storm 可靠api一部分它能保证,不会丢失数据,这些在本教程后面的章节中还会阐述。

cleanup 方法会在 Bolt 停止时被调用,用来关闭清理所有打开的资源。不能保证这个方法一定会在集群中被调用,如果正在运行的机器发生了爆炸(作者在搞笑),这样就没办法调用这个方法了。cleanup方法其实是专门为你在本地模式(将Storm集群在一个进程中模拟出来)下运行 topology ,你希望运行和杀掉 topology 而不必担心资源泄露。

declareOutputFields 方法声明 ExclamationBolt 发射包含一个 word 字段的 tuple

getComponentConfiguration 方法允许你配置影响这个 bolt 如何运行的各种参数,有一个更高级的话题专门讨论关于配置的更多内容 Configuration.

cleanup 和 getComponentConfiguration 方法通常并不是必须的, 你可以通过继承一个提供了默认实现的基类来更简洁的定义 bolt。 通过继承 BaseRichBolt类 ,ExclamationBolt可以被实现的更简洁,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

在本地模式下运行 ExclamationTopology

我们来看下如何在本地模式下运行 ExclamationTopology

Storm 有两种运行模式:本地模式和分布式模式。在本地模式中,Storm 完全在一个进程中运行,用线程来模拟各个工作节点。本地模式对与开发和测试topology是非常有用的,当你运行 storm-starter 中的 topology时,它会运行在本地模式下,你可以看到每一个组件发射的消息,你可以阅读更多关于本地模式的内容

在分布式模式下,Storm 运行在一组机器上,当你提交一个 topology 到 master上,就会同时提交所有必要的代码来运行这个 topology,master会负责分发你的代码,并分配工作进程来运行你的 topology,如果工作进程挂掉了,master会在某处重新分配他们。你可以阅读更多关于在一个集群上来运行topology的内容,

下面是在本地模式运行 ExclamationTopology 的代码

1
2
3
4
5
6
7
8
9
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先,这段代码通过创建 LocalCluster 对象定义了一个进程内的集群。提交 topology 到虚拟集群和提交到分布式集群是一样的,通过调用 submitTopology 来向 LocalCluster 中提交 topology,它接受三个参数,topology的名字,topology的配置,topology本身。

名字是用来识别这个 topology,以便日后杀掉它。。topology会一直运行直到你杀掉它。

配置是用来调优运行 topology 的各个方面,下面是两个常见的配置:

  1. TOPOLOGY_WORKERS (用 setNumWorkers来设置) 指定你将在集群分配几个进程来运行这个这个topology,topology中的每一个组件会被当做多个线程来运行。一个组件被分配线程的数量通过 setBolt 和 setSpout 方法来配置,这些线程存在于工作进程中。每个工作进程包含一些组件中的一些线程,例如,你分配了 300 个线程给所有的组件,在配置中设置了50个工作进程,那么每个工作进程会运行6个线程,每一个线程可能属于不同的组件。通过调整每个元件的并行度和运行这些线程的工作进程的数量来对 storm 的并行性能调优。
  2. TOPOLOGY_DEBUG (通过 setDebug 设置),当被设为 true 时,storm 将记录元件发射的每个消息,在本地模式测试topology时这是很有用的,但是在线上模式运行时,你更愿意将它关闭

流分组 Stream groupings

流分组让 topology 知道在组件之间如何发送 tuple,记住 spouts 和 bolts 是被当成很多 tasks 并行运行在整个集群中的,如果你想看看 topology 是如何在 task 层级运行的,就像下图这样

这里的task 就是setBolt和 setSpout 中产生的工作线程,如果设置了数量,就是线程组或者任务组即 set of tasks

当一个运行 Bolt A 的 task 发射了一个 tuple 到 Bolt B,那么它应该发射到哪个 task(当然是运行Bolt B 的task) 呢?

流分组 (Stream grouping)答了这个问题,它告诉 Storm 如何在 set of task(任务组)之间发送 tuple,在我们深入不同种类的流分组以前,让我们看看 storm-start 里的另一个 topology ,WordCountTopology从一个 spout 中读取句子并且从 WordCountBolt 中获取某个单词出现的次数:

1
2
3
4
5
6
7
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));

SplitSentence 把它接收到的每一个句子中的每一个单词当做 tuple 发送出去,WordCount在内存中维护了一个单词和数量的映射关系,每次 WordCount 接收到一个单词,它就更新单词的数量,然后发送新的单词数量。

当然还有一些不同种类的流分组。

基本的分组类型叫做 “乱序分组(shuffle grouping)” ,它将使 tuple 被随机发个一个 task,WordCountTopology中 使用了乱序分组来从 RandomSentenceSpout 向 SplitSentence 发送 tuple, 这样所有的处理任务就能够被平均的分配到所有运行 SplitSentence Bolt的 task 上。

一个更有趣的分组类型是 字段分组(fields grouping) ,SplitSentence 和 WordCount之间使用了一个字段分组,WordCount能够运作的一个极为重要的要求是相同的单词必须被发到同一个 task中,否则会有一个以上的 task 会接收到相同的单词,然后他们会发射错误的计数。字段分组使我们可以用字段将一个流分组,这使得相同字段的内容总是被分到同一个task中。由于 WordCount 在 word 字段上使用字段分组订阅了 SplitSentence‘s 的输出流,这样相同的单词总是会进入到相同的task.

字段分组是流连接和流聚合以及许多其他用力的基本实现,究其原理,字段分组是通过 mod hashing(哈希的一种) 来实现的

还有一些其他类型的分组,你可以在概念里查看更多。

使用其他编程语言编写 Bolt

Bolt 可以使用其他编程语言编写,使用其他语言编写的 Bolt 会被当做子进程来执行,Storm 通过 stdin/stdout 用json格式的信息来与这些子进程通信,只需要引入一个100行左右代码的适配器类库即可完成通信协议, Storm 提供了Ruby , Python 等等语言的类库
下面是从 WordCountTopology 到 SplitSentence 的实现

1
2
3
4
5
6
7
8
9
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

SplitSentence 重写了父类的ShellBolt方法, 声明它用 splitsentence.py 作为参数来运行python, 下面是splitsentence.py 的实现

1
2
3
4
5
6
7
8
9
import storm

class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])

SplitSentenceBolt().run()

关于如何用其他语言编写 spouts 和 bolts 以及如何用其他语言编写 topology 的内容,请查阅
Using non-JVM languages with Storm.

保证消息处理的可靠性

教程的前面我们略过了一些 tuple 发射方面的内容,这些方面的内容就是 Storm 的可靠性 API, 即 Storm 是如何保证从 spout 中出来的信息都能够被完全的处理,阅读 Guaranteeing message processing f 来了解它是如何运作的,以及作为一个用户如何利用 Storm 的可靠性能力。

Transactional Topologies

Storm guarantees that every message will be played through the topology at least once. A common question asked is “how do you do things like counting on top of Storm? Won’t you overcount?” Storm has a feature called transactional topologies that let you achieve exactly-once messaging semantics for most computations. Read more about transactional topologies here.

分布式 RPC

这篇教程解释了如何在 Storm 上做基本的流处理。当然你还可以用 Storm 的基础组件做更多的事。其中一个非常有趣的应用是分布式 RPC,在这些 RPC机器上做繁忙的并行计算,阅读更多关于分布式RPC

总结

这个教程给出了 如何开发、测试以及部署 Storm topology 的概览,其余的文档将深入到使用 Storm 使用的方方面面。

Storm 入门教程