首页 > 代码库 > Spring XD简介:大数据应用的运行时环境

Spring XD简介:大数据应用的运行时环境

简介

Spring XD(eXtreme Data,极限数据)是Pivotal的大数据产品。它结合了Spring BootGrails,组成Spring IO平台的执行部分。尽管Spring XD利用了大量现存的Spring项目,但它是一种运行时环境,而不是一个类库或者框架,它包含带有服务器的bin目录,你可以通过命令行启动并与之交互。运行时可以运行在开发机上、客户端自己的服务器上、AWS EC2上或者Cloud Foundry上。

Spring XD中的关键组件是管理和容器服务器(Admin and Container Servers)。使用一种DSL,你可以把所需处理任务的描述通过HTTP提交给管理服务器。然后管理服务器会把处理的任务映射到处理模块(每个模块都是一个执行单元,作为Spring应用程序上下文实现)中。

该产品具有两种操作模式:-single和multi-node。第一种是由单独的进程负责所有处理和管理的工作。这对于入门很有用,同样适合于应用程序的快速开发和测试。本文中的所有实例都被设计为在单一节点模式下工作。第二种是一种分布式模式。分布式集成运行时(Distributed Integration Runtime,DIRT)会在多个节点之间分发处理的任务。除了可以拥有VM或者物理服务器作为这些节点之外,Spring XD还让你可以在Hadoop YARN集群上运行。



XD管理服务器会把处理的任务切分成彼此独立的模块定义,并把每个模块分配给使用Apache ZooKeeper的容器实例。每个容器都会监听分配给它的模块定义,然后部署模块,创建Spring应用程序上下文来运行它。需要注意的是,在我撰写这篇文章的时候,Spring XD中还不会自带Zookeeper。兼容的版本是3.4.6,你可以从这里下载。

模块通过使用配置好的消息中间件传递消息来共享数据。传输层是可插拔的,并且支持其他两种Pivotal项目——RedisRabbit MQ——以及现成可用的内存数据库。

用例

下图让你可以对Spring XD有个总体上的了解。


Spring XD团队认为,对于创建大数据解决方案来说,创建的主要用例有四种:数据吸纳、实时分析、工作流调度以及导出。

数据吸纳提供了一种能力,可以从各种输入源接收数据,并把它传输给大数据存储库,像HDFS(Hadoop文件系统)、Splunk或者MPP数据库。和文件一样,数据源可能包括来自于移动设备、支持MQ遥感传输协议(MQTT)的传感器以及像Twitter之类的社交流的事件。

吸纳过程会贯穿事件驱动数据的处理,以及针对其他类型数据的批处理(MR、PIG、Hive、Cascading、SQL等等)。流和作业的两个世界截然不同,但是Spring XD试图使用通道抽象(channel abstraction)来模糊二者之间的边界,从而让流可以触发批处理作业,而批处理作业也可以发送事件从而触发其他流。

对于流来说,会通过叫做“Taps”的抽象来支持某些实时分析,像获取指标和计数值。从概念上,Taps让你可以介入到流中,执行实时分析,并有选择地为外部系统生成数据,像GemFire、Redis或者其他内存数据网格。

一旦你在大数据仓库中拥有数据,那么就需要某种工作流工具来对处理进行调度。调度非常必要,因为你编写的脚本或者map-reduce作业通常会长时间运行,并采用带有多个步骤的事件链的方式。理想状况下,你需要在事件失败的时候,能够从特定的步骤重新启动,而不是完全从头来过。

最后还需要导出步骤,从而把数据放到更适合展现的系统中,可能还会做进一步的分析。例如从HDFS到RDBMS(关系型数据库管理系统),在那里你可以使用更为传统的商业智能工具。

Spring XD想要提供一种统一、分布式和可扩展的服务来满足这些用例。它没有从头开始,而是利用了大量已经存在的Spring技术。例如,它使用了Spring Batch来支持工作流调度和导出用例,使用Spring Integration来支持流处理,此外还使用了各种各样的企业应用程序集成模式。其他关键的Spring产品包括:使用Spring Data处理NoSQL/Hadoop工作,使用Reactor为编写异步程序提供简化的API,特别是在使用LMAX Disruptor的时候。

安装Spring XD

在接下来的部分,我们会详细看一下每个用例。你可能想要自己来试验一下这些例子。起步非常简单。

为了开始,你要确保系统至少安装了Java JDK 6或者更新的版本。我推荐使用Java JDK 7。

对于OSX用户,如果还没有Homebrew的话,请安装,然后运行:

brew tap pivotal/tapbrew install springxd

这会安装到 /usr/local/Cellar/springxd/1.0.0.M7/libexec (依赖于Spring XD的库)。

注意:如果你随后想要安装更新的版本,那么使用brew upgrade springXD就可以。

红帽或者CentOS的用户可以使用Yum来安装。

Windows用户可以下载最新的.zip文件,解压,安装到文件夹,然后把XD_HOME这个环境变量设置成安装文件夹。

你可以通过键入以下命令,从而在单一节点上启动Spring XD:

xd-singlenode

键入以下命令来打开另一个终端窗口并启动shell程序:

xd-shell

你会看到下面这样的情况:

为了检查它是否正常工作,让我们创建一个快速的流:

stream create --definition "time | log" --name ticktock --deploy

在你启动Spring XD的控制台中,你会看到下面这样的显示:

你可以从shell中使用stream destroy命令删除流。

stream destroy --name ticktock数据吸纳流

在Spring XD中,基本的流会定义事件驱动数据的吸纳,从源到目的地,经过任意多个处理器。

Spring XD外壳程序支持针对流定义的一种DSL,其中带有管道和过滤器语法 - source | processor | sink。

例如,像这样的命令 stream create --name filetest --definition "file | log" --deploy会记录文件内容的日志。
除了能够处理文件之外,Spring XD还支持很多其他源,包括:

HTTP

命令 HTTP POST /streams/myStream "http | file --deploy" -表示“从HTTP消费我的流,并转到文件”。这会默认到9000端口。你可以使用--port选项覆盖默认的端口设置。这是针对HTTP的唯一参数。

例如(从XD的外壳程序):

xd:> stream create --name infoqhttptest9010 --definition "http --port=9010 | file" --deploy

你可以向这个新端口提交一些数据来测试:

xd:> http post --target http://localhost:9010 --data "hello world"

你会在控制台窗口看到以下文本:

> POST (text/plain;Charset=UTF-8) http://localhost:9010 hello world > 200 OK

打开另一个终端窗口并键入:

$ cd /tmp/xd/output $ tail -f infoqhttptest9010.out

你会在输出中看到“hello world”。

想要发送二进制数据,你需要把Content-Type头部说明设置为application/octet-string:

$ curl --data-binary @foo.zip -H‘Content-Type: application-octet-string‘ http://localhost:9000

键入 stream destroy infoqhttptest9010 来完成清理工作。

Mail

Mail是用来接收email的源模块。根据所使用的协议,它可以以池的形式工作,或者在可用的时候就接收email。

例如:

xd:> stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles@c4media.com --password=secret --delete=false | file" --deploy

注意:这里的delete选项很重要,因为对于Spring XD来说一旦被消费,默认情况就会删除电子邮件。Spring XD也拥有markAsRead选项,但默认值是false。Spring集成文档中对此做出了详细的说明,但主要问题是,POP3协议只知道在单独一个会话中读取了什么。作为POP3邮件适配器运行的结果,当邮件在每个池中变成可用状态时,就会被成功发送,且没有任何一个邮件消息会被多次发送。然而,当你重启适配器并开始新的会话时,所有位于上一个会话中已经获取过的邮件消息就可能会被再次获取。

如果你在控制台日志中看到这样的错误信息:

WARN task-scheduler-1 org.springframework.integration.mail.ImapIdleChannelAdapter:230 - error occurred in idle task javax.mail.AuthenticationFailedException: failed to connect, no password specified?

试着在你的URL把@符号替换为URL编码的样子: %40:

stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles%40c4media.com --password=secret --delete=false | file" --deploy

打开另一个终端窗口并键入:

$ cd /tmp/xd/output $ tail -f infoqmailstream.out

给你自己发送一封邮件,以看到它在日志文件中显示的内容。

Twitter搜索

Spring XD就可以使用Twitter搜索API(twittersearch),也可以使用来自于Twitter‘s Streaming API的数据。

例如:

xd:> stream create --name twittersearchinfoq --definition "twittersearch --outputType=application/json --fixedDelay=1000 --consumerKey=afes2uqo6JAuFljdJFhqA --consumerSecret=0top8crpmd1MXGEbbgzAwVJSAODMcbeAbhwHXLnsg --query=‘infoq‘ | file" --deploy

它使用twittersearch的JSON输出格式,每1000毫秒使用令牌“infoq”在Twitter中进行查询。为了运行上面的内容,你需要一个消费者密钥(由Twitter发放的应用程序消费者密钥)以及它相关的密钥。


它的结果会通过管道以同步的方式传输给一个文件,默认是/tmp/xd/output/[streamName].out。

打开另一个终端窗口并键入:

$ cd /tmp/xd/output $ tail -f twittersearchjava.out

稍等一会儿,你会发现超出了Twitter APE搜索的限制,并且会在控制台窗口中(你在其中在单一节点上启动了XD)看到这样的消息:

11:27:01,468 WARN task-scheduler-1 client.RestTemplate:581 - GET request for "https://api.twitter.com/1.1/search/tweets.json?q=infoq&count=20&since_id=478845525597237248" resulted in 429 (Client Error (429)); invoking error handler11:27:01,471 ERROR task-scheduler-1 handler.LoggingHandler:145 - org.springframework.social.RateLimitExceededException: The rate limit has been exceeded.

键入 stream destroy twittersearchinfoq 来完成清理工作。

其他输入流

GemFire:在XD容器进程中配置一个缓存(cache)和副本区域,它和Spring Integration GemFire同时存在于通道适配器中,它们由CacheListener支持,而后者会输出区域中外部输入事件所触发的输出消息。它还支持连续的查询,那让客户端应用程序可以使用对象查询语言(OQL)来创建GemFire查询,并注册一个CQ监听器,它会订阅查询,每次查询的结果集发生变化的时候都会得到通知。

Reactor IP:它会作为服务器,让远程的组织能够连接到XD,并通过原生的TCP或者UDP socket提交数据。reactor-ip源和标准的tcp源的区别在于,它基于Reactor项目,可以被配置为使用LMAX Disruptor RingBuffer库,它能够允许极高的吸纳率,大概每秒1M。

Syslog:有三种syslog源:reactor-syslog、syslog-udp和syslog-tcp。reactor-syslog适配器使用tcp,会构建Reactor项目中可用的功能,并提供超过syslog-tcp适配器中更好的吞吐量。

TCP:它会作为服务器,让远程的组织能够连接到XD,并通过原生的TCP socket提交数据。

MQTT:连接到MQTT服务器并接收遥测消息。

Taps

在流的任意位置,你都可以插入tap——这个词来自于Gregor Hohpe等人著的《应用程序集成模式(Application Integration Patterns)》一书中的“wire tap”模式。

从概念上说,你会在通道中插入一个简单的接收列表,它会把每个进入的消息发布到主通道和次通道中。流并不知道它的管道中任何tap的存在。删除流并不会自动删除tap——它们需要单独删除。然而,如果加入了tap的流被重新创建,那么已经存在的tap会继续起作用。

tap可以在流的任意位置(或者多个位置)插入。

处理器

流中的数据可以以多种方式处理:

过滤器:它可以用于决定消息是否应该发送给输出通道。最简单的情况是,过滤器只是一个SpEL布尔表达式,它会返回真或假。例如:

xd:> stream create --name filtertest --definition "http | filter --expression=payload==‘good‘ | log" --deploy

会记录带有“good”关键字的所有内容的日志。然而,过滤器也可以相当复杂。Spring XD支持JSONPath计算式以及自定义的Groovy脚本。

转换:用来转换消息的内容或结构。它支持简单的SpEL,对于更复杂的转换,可以使用Groovy脚本。

分割器:和Spring集成中的分割器概念类似,这里的分割器会使用SpEL表达式,它会计算一个数组或者集合的值,从而把单独一条消息切分成多个独立的消息。你可以使用JSON oath表达式,但无法使用自定义的Groovy脚本。

聚合器(Aggregator):和分割器相反,它会把多条消息组合成一条。

最后是脚本,可以用于调用特定的Groovy脚本作为处理步骤。

槽(Sinks)

最简单的槽是日志和文件。其他可以支持的槽包括Hadoop(HDFS)、JDBC、TCP、Mail、RabbitMQ、GemFire服务器、Splunk服务器和MQQT。还有一个动态路由选项,允许基于SpEL表达式或Groovy脚本的值,把Spring XD消息路由到命名通道中。让我有一点奇怪的是,在这里缺少一般目的的JMS槽,尽管我们可以像[url=http://www.infoq.com/cn/articles/(https://github.com/spring-projects/spring-xd/wiki/Extending-XD]这里[/url]描述的一样构建自定义的槽模块。

实时分析

Spring XD为各种机器学习评分算法的实时计算提供了支持,还为使用各种类型的计数器和计量器进行实时数据分析提供了支持。分析功能是通过可以添加到流中的模块实现的。在那种情况下,实时分析是通过和数据吸纳一样的模块完成的。

尽管流的主要角色可以是执行实时分析,但更为常见的是添加一个tap来初始化另一个流,其中分析——例如:一个字段值的计数器——会应用给通过主要流吸纳的同样数据之上。

Spring XD中自带提供了一些简单的分析工具,它们都实现为抽象API,针对内存数据库和Redis而实现,如下:

  • 简单计数器
  • 字段值计数器:计算特定字段出现的次数。
  • 聚合计数器: 在Mongo和Redis之类的工具中比较常见,让你可以对数据根据时间——例如分钟、小时、月、年等——进行分片。
  • 计量器(Gauge):最新的值
  • 富计量器:最新的值,运行的平均值,最大、最小值

对于预测性的分析,Spring XD包含了一个可扩展的类库,基于它可以构建其他实现。例如在GitHub上提供的PMML模块,它和JPMML-Evaluator库集成,为更广范围内的模型类型提供了支持,并且可以与从RRattleKNIMERapidMiner导出的模块进行互操作。

产品还包含了一些抽象,可以在流处理应用程序中事件分析模型。在撰写这篇文章的时候,只支持预测性模块标记语言(Predictive Model Markup Language,PMML),但Pivotal告诉InfoQ:

我们正在进行一个内部项目,以提供广泛的分析解决方案,它的目标是围绕“欺诈检测”和“网络安全”之类的情况。我们还在与OSS库——像“stream-lib”和“graphlab”——的整合做了一些设计。

Pivotal还说明,他们期望,随着时间的推移能够在这个领域看到发展,并且对预测性建模提供额外的支持。

批处理作业、工作流调度和导出

除了流之外,Spring XD还包含了基于Spring Batch启动和监控批处理作业的功能,而Spring Batch也被用于支持工作流调度和导出用例。

工作流的概念会被转换成批处理作业,那可以被认为是各个步骤的有向图,每个图都是一个处理步骤:

根据配置的情况,步骤可以顺序或者并行执行。它们可以复制或者处理来自于文件、数据库、MapReduce、Pig、Hive或Cascading作业的数据,并且和允许重启的检查点一起持久化。和流一样,作业支持单节点,或者可以和数据分区一起分布。

Spring XD自身带有少量预定义的作业,可以用来向Hadoop文件系统HDFS导出数据,或者从中导入数据。这些作业覆盖了FTP到HDFS、HDFS到JDBC、HDFS到MongoDB和JDBC到HDFS。还有一个作业用于向JDBC导出文件。你可以在/libexec/xd/modules/job文件夹中找到。

Spring XD提供了相当基础的、基于浏览器的图形化界面,当前让你可以执行和任务相关的批处理作业。对于启动Spring XD,管理员界面在这里提供:

(点击图像可以放大)

正如在上面的截屏中可以看到的,管理员界面当前包括四个标签页:

  • 模块:列举了可用的批处理作业和更多细节(像作业模块选项以及模块的XML配置文件)。
  • 定义:列举了XD批处理作业定义,并提供了部署或者卸载那些作业的动作。
  • 部署:列举了所有部署了的作业,并提供了一种选项来启动部署好的作业。一旦作业已经部署,它就可以通过管理员界面启动。
  • 执行:列举了批处理作业的执行状况,并提供了一种选项,如果批处理作业可以重启,并且处于停止或者失败状态,那么就重启。
结论

Spring XD当前还处于开发中。第一个里程碑版本已经在2013年六月发布,而GA版本期望在今年(2014年)七月发布。它基于Apache第二版许可。在GitHub上提供了源代码示例。你还可以找到在线的Sonar代码度量

产品可能还很新,但正如我们看到的,它构建在成熟的基础之上——Spring Batch、Spring Integration和Sping Data,以及Reactor项目、LMAX Disruptor和Apache Hadoop——并提供了一种轻量级的运行时环境,可以通过DSL来配置和集成,只需要很少代码,甚至不需要。Spring XD为开发者提供了一种便利的方式,可以开始构建大数据应用程序,为构建和部署这样的应用程序提供了“一站式服务”。

对于想要探索这个产品的读者,有大量资源可用,包括主要的wiki,还有覆盖了实时分析的视频

关于作者

Charles Humble从2014年三月开始担任InfoQ.com编辑团队的主编,引领我们的内容创建工作,包括新闻、文章、书籍、视频和采访。在全职加入InfoQ之前,Charles领导过我们的Java部分工作,是PRPi顾问公司的CTO,该公司是一家简历研究公司,在2012年七月被PwC收购。他作为开发者、架构师和开发经理在软件企业中工作了近20年。在空闲时间,他会写一些音乐,并且是伦敦周边的技术小组Twofish的成员。

Spring XD简介:大数据应用的运行时环境