首页 > 代码库 > kafka+storm初探

kafka+storm初探

    由于项目需要,最近对storm进行了预研,安装与使用方式网上有很多示例,在此记录一下,备忘。

一、storm简介

    Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。在Storm Concepts页面里对这些术语有更详细的描述。

    运行Storm集群,你需要Apache Zookeeper、ØMQ、JZMQ、Java 6和Python 2.6.6。ZooKeeper用于管理集群中的不同组件,ØMQ是内部消息系统,JZMQ是ØMQ的Java。

    安装详细:http://blog.csdn.net/qiyating0808/article/details/36041299

    启动storm集群:

storm nimbus >/dev/null 2>&1 &
storm supervisor >/dev/null 2>&1 &
storm ui >/dev/null 2>&1 &
    topology任务调度:

    在storm(0.9.2)目录下,有测试jar包(apache-storm-0.9.2-incubating/examples/storm-starter)可以进行集群环境验证。

    任务调度方式:

#LocalCluster方式
storm jar storm-starter-topologies-0.9.2-incubating.jar storm.starter.WordCountTopology

#集群方式
storm jar storm-starter-topologies-0.9.2-incubating.jar storm.starter.WordCountTopology args

    LocalCluster属于单机方式,白话就是可以不依赖集群进行结果测试验证,开发阶段该方式很有用,只要将storm依赖的jar引入project,使用单机方式在本地进行测试,通过后再投放到集群中。

示例代码片段(截取自WordCountTopology):


public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 5);

    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopologyWithProgressBar("wordCount", conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
    }
}
    Spout源数据预研使用LinkedIn Kafka,将Spout根据topic来获取对应的生产信息,在storm集群中消费掉。


二、Kafka简介

    安装过程几乎没有,解压后即可直接使用。

    使用方式:


启动kafka
./kafka-server-start.sh ../config/server.properties
创建topic
./kafka-topics.sh --topic kafkaToptic --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partition 1 
查看consumer
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic kafkaToptic --from-beginning
查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
生产消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafkaToptic
    kafka 与storm整合开源项目也有很多,不需要二次开发,例如storm-kafka-0.8-plus。


    Kafka生产者,在大数据框架中也有使用flume进行数据生产。