首页 > 代码库 > kafka+storm初探
kafka+storm初探
由于项目需要,最近对storm进行了预研,安装与使用方式网上有很多示例,在此记录一下,备忘。
一、storm简介
Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。在Storm Concepts页面里对这些术语有更详细的描述。
运行Storm集群,你需要Apache Zookeeper、ØMQ、JZMQ、Java 6和Python 2.6.6。ZooKeeper用于管理集群中的不同组件,ØMQ是内部消息系统,JZMQ是ØMQ的Java。
安装详细:http://blog.csdn.net/qiyating0808/article/details/36041299
启动storm集群:
storm nimbus >/dev/null 2>&1 & storm supervisor >/dev/null 2>&1 & storm ui >/dev/null 2>&1 &topology任务调度:
在storm(0.9.2)目录下,有测试jar包(apache-storm-0.9.2-incubating/examples/storm-starter)可以进行集群环境验证。
任务调度方式:
#LocalCluster方式 storm jar storm-starter-topologies-0.9.2-incubating.jar storm.starter.WordCountTopology #集群方式 storm jar storm-starter-topologies-0.9.2-incubating.jar storm.starter.WordCountTopology args
LocalCluster属于单机方式,白话就是可以不依赖集群进行结果测试验证,开发阶段该方式很有用,只要将storm依赖的jar引入project,使用单机方式在本地进行测试,通过后再投放到集群中。
示例代码片段(截取自WordCountTopology):
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar("wordCount", conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }Spout源数据预研使用LinkedIn Kafka,将Spout根据topic来获取对应的生产信息,在storm集群中消费掉。
二、Kafka简介
安装过程几乎没有,解压后即可直接使用。
使用方式:
启动kafka ./kafka-server-start.sh ../config/server.properties 创建topic ./kafka-topics.sh --topic kafkaToptic --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partition 1 查看consumer ./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic kafkaToptic --from-beginning 查看topic ./kafka-topics.sh --list --zookeeper localhost:2181 生产消息 ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafkaToptickafka 与storm整合开源项目也有很多,不需要二次开发,例如storm-kafka-0.8-plus。
Kafka生产者,在大数据框架中也有使用flume进行数据生产。