首页 > 代码库 > Flume+Kafka+Strom基于分布式环境的结合使用
Flume+Kafka+Strom基于分布式环境的结合使用
目录:
一、Flume、Kafka、Storm是什么,如何安装?
二、Flume、Kafka、Storm如何结合使用?
1) 原理是什么?
2) Flume和Kafka的整合
3) Kafka和Storm的整合
4) Flume、Kafka、Storm的整合
一、Flume、Kafka、Storm是什么,如何安装?
Flume的介绍,请参考这篇文章《Flume1.5.0的安装、部署、简单应用》
Kafka的介绍,请参考这篇文章《kafka2.9.2的分布式集群安装和demo(java api)测试》
Storm的介绍,请参考这篇文章《ubuntu12.04+storm0.9.2分布式集群的搭建》
在后面的例子中,我们也是使用以上三篇文章中的配置进行测试。
二、Flume、Kafka、Storm如何结合使用?
1) 原理是什么?
如何你仔细阅读过关于Flume、Kafka、Storm的介绍,就会知道,在他们各自之间对外交互发送消息的原理。
在后面的例子中,我们主要对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出。
2) flume和kafka的整合
#复制flume要用到的kafka相关jar到flume目录下的lib里面。
?
1 2 3 | root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/lib root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/lib root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib |
#编写sink.java文件,然后在eclipse导出jar包,放到flume-1.5.1-bin/lib目录中,项目中要引用flume-ng-configuration-1.5.0.jar,flume-ng-sdk-1.5.0.jar,flume-ng-core-1.5.0.jar,zkclient-0.3.jar,commons-logging-1.1.1.jar,在flume目录中,可以找到这几个jar文件,如果找不到就用find命令搜一下。
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | package idoall.cloud.flume.sink; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class KafkaSink extends AbstractSink implements Configurable { private static final Log logger = LogFactory.getLog(KafkaSink. class ); private String topic; private Producer<String, String> producer; public void configure(Context context) { topic = "idoall_testTopic" ; Properties props = new Properties(); props.setProperty( "metadata.broker.list" , "m1:9092,m2:9092,s1:9092,s2:9092" ); props.setProperty( "serializer.class" , "kafka.serializer.StringEncoder" ); props.put( "partitioner.class" , "idoall.cloud.kafka.Partitionertest" ); props.put( "zookeeper.connect" , "m1:2181,m2:2181,s1:2181,s2:2181/kafka" ); props.setProperty( "num.partitions" , "4" ); // props.put( "request.required.acks" , "1" ); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); logger.info( "KafkaSink初始化完成." ); } public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction tx = channel.getTransaction(); try { tx.begin(); Event e = channel.take(); if (e == null ) { tx.rollback(); return Status.BACKOFF; } KeyedMessage<String, String> data = http://www.mamicode.com/ new KeyedMessage<String, String>(topic, new String(e.getBody())); producer.send(data); logger.info( "flume向kafka发送消息:" + new String(e.getBody())); tx.commit(); return Status.READY; } catch (Exception e) { logger.error( "Flume KafkaSinkException:" , e); tx.rollback(); return Status.BACKOFF; } finally { tx.close(); } } } |
#在m1上配置flume和kafka交互的agent
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | root@m1: /home/hadoop/flume-1 .5.0-bin # vi /home/hadoop/flume-1.5.0-bin/conf/kafka.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1. type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1. type = idoall.cloud.flume.sink.KafkaSink # Use a channel which buffers events in memory a1.channels.c1. type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
#在m1,m2,s1,s2的机器上,分别启动kafka(如果不会请参考这篇文章介绍了kafka的安装、配置和启动《kafka2.9.2的分布式集群安装和demo(java api)测试》),然后在s1机器上再启动一个消息消费者consumer
?
1 | root@m1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties & |
#在m1启动flume
?
1 2 3 4 5 6 7 8 9 10 11 | root@m1: /home/hadoop # /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console #下面只截取部分日志信息 14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成. 14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 14 /08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 14 /08/19 11:36:34 INFO node.Application: Starting Channel c1 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean. 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started 14 /08/19 11:36:34 INFO node.Application: Starting Sink k1 14 /08/19 11:36:34 INFO node.Application: Starting Source r1 14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting... |
#在m1上再打开一个窗口,测试向flume中发送syslog
?
1 | root@m1: /home/hadoop # echo "hello idoall.org syslog" | nc localhost 5140 |
#m1打开的flume窗口中看最后一行的信息,Flume已经向kafka发送了消息
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | 14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成. 14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 14 /08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 14 /08/19 11:36:34 INFO node.Application: Starting Channel c1 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean. 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started 14 /08/19 11:36:34 INFO node.Application: Starting Sink k1 14 /08/19 11:36:34 INFO node.Application: Starting Source r1 14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting... 14 /08/19 11:38:05 WARN source .SyslogUtils: Event created from Invalid Syslog data. 14 /08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id :3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic) 14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing 14 /08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:9092 14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing 14 /08/19 11:38:05 INFO sink.KafkaSink: flume向kafka发送消息:hello idoall.org syslog |
#在刚才s1机器上打开的kafka消费端,同样可以看到从Flume中发出的信息,说明flume和kafka已经调试成功了。
?
1 2 3 4 5 6 7 8 9 10 11 | root@s1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginning SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details. [2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager) [2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn‘t exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log) [2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2 .9.2-0.8.1.1 /kafka-logs with properties {segment.index.bytes -> 10485760, file .delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition) [2014-08-11 14:22:12,375] INFO Closing socket connection to /192 .168.1.50. (kafka.network.Processor) hello idoall.org syslog |
3) kafka和storm的整合
#我们先在eclipse中写代码,在写代码之前,我们要先对maven进行配置,pom.xml配置文件内容如下:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | <? xml version = "1.0" encoding = "utf-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</ modelVersion > < groupId >idoall.cloud</ groupId > < artifactId >idoall.cloud</ artifactId > < version >0.0.1-SNAPSHOT</ version > < packaging >jar</ packaging > < name >idoall.cloud</ name > < url >http://maven.apache.org</ url > < properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > </ properties > < repositories > < repository > < id >github-releases</ id > < url >http://oss.sonatype.org/content/repositories/github-releases/</ url > </ repository > < repository > < id >clojars.org</ id > < url >http://clojars.org/repo</ url > </ repository > </ repositories > < dependencies > < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >4.11</ version > < scope >test</ scope > </ dependency > < dependency > < groupId >com.sksamuel.kafka</ groupId > < artifactId >kafka_2.10</ artifactId > < version >0.8.0-beta1</ version > </ dependency > < dependency > < groupId >log4j</ groupId > < artifactId >log4j</ artifactId > < version >1.2.14</ version > </ dependency > < dependency > < groupId >storm</ groupId > < artifactId >storm</ artifactId > < version >0.9.0.1</ version > <!-- keep storm out of the jar-with-dependencies --> < scope >provided</ scope > </ dependency > < dependency > < groupId >commons-collections</ groupId > < artifactId >commons-collections</ artifactId > < version >3.2.1</ version > </ dependency > </ dependencies > </ project > |
#编写KafkaSpouttest.java文件
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | package idoall.cloud.storm; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class KafkaSpouttest implements IRichSpout { private SpoutOutputCollector collector; private ConsumerConnector consumer; private String topic; public KafkaSpouttest() { } public KafkaSpouttest(String topic) { this .topic = topic; } public void nextTuple() { } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this .collector = collector; } public void ack(Object msgId) { } public void activate() { <span style= "font-size: 9pt; line-height: 25.2000007629395px;" > </span>consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); <span style= "font-size: 9pt; line-height: 25.2000007629395px;" > </span>Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1 ); System.out.println( "*********Results********topic:" +topic); Map<String, List<KafkaStream< byte [], byte []>>> streamMap=consumer.createMessageStreams(topickMap); KafkaStream< byte [], byte []>stream = streamMap.get(topic).get( 0 ); ConsumerIterator< byte [], byte []> it =stream.iterator(); while (it.hasNext()){ String value =http://www.mamicode.com/ new String(it.next().message()); SimpleDateFormat formatter = new SimpleDateFormat ( "yyyy年MM月dd日 HH:mm:ss SSS" ); Date curDate = new Date(System.currentTimeMillis()); //获取当前时间 String str = formatter.format(curDate); System.out.println( "storm接收到来自kafka的消息------->" + value); collector.emit( new Values(value, 1 ,str), value); } } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); // 设置zookeeper的链接地址 props.put( "zookeeper.connect" , "m1:2181,m2:2181,s1:2181,s2:2181" ); // 设置group id props.put( "group.id" , "1" ); // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新 props.put( "auto.commit.interval.ms" , "1000" ); props.put( "zookeeper.session.timeout.ms" , "10000" ); return new ConsumerConfig(props); } public void close() { } public void deactivate() { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" , "id" , "time" )); } public Map<String, Object> getComponentConfiguration() { System.out.println( "getComponentConfiguration被调用" ); topic= "idoall_testTopic" ; return null ; } } |
#编写KafkaTopologytest.java文件
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | package idoall.cloud.storm; import java.util.HashMap; import java.util.Map; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class KafkaTopologytest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "spout" , new KafkaSpouttest( "" ), 1 ); builder.setBolt( "bolt1" , new Bolt1(), 2 ).shuffleGrouping( "spout" ); builder.setBolt( "bolt2" , new Bolt2(), 2 ).fieldsGrouping( "bolt1" , new Fields( "word" )); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 1 ); conf.put(Config.TOPOLOGY_DEBUG, true ); LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "my-flume-kafka-storm-topology-integration" , conf, builder.createTopology()); Utils.sleep( 1000 * 60 * 5 ); // local cluster test ... cluster.shutdown(); } public static class Bolt1 extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString( 0 ); int id = input.getInteger( 1 ); String time = input.getString( 2 ); msg = msg+ "bolt1" ; System.out.println( "对消息加工第1次-------[arg0]:" + msg + "---[arg1]:" +id+ "---[arg2]:" +time+ "------->" +msg); if (msg != null ) { collector.emit( new Values(msg)); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" )); } } public static class Bolt2 extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String msg = tuple.getString( 0 ); msg = msg + "bolt2" ; System.out.println( "对消息加工第2次---------->" +msg); collector.emit( new Values(msg, 1 )); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" , "count" )); } } } |
#测试kafka和storm的结合
打开两个窗口(也可以在两台机器上分别打开,下面的例子中,我会打开m2和s1机器 ),分别m2上运行kafka的producer,在s1上运行kafka的consumer(如果刚才打开了就不用再打开),先测试kafka自运行是否正常。
如下所示,我在m2上运行producer,输入“hello welcome idoall.org”,在s1的机器上consumer同样收到了消息。说明kafka已经运行正常,并且消息通讯也没有问题。
m2机器输出的消息:
?
1 2 3 4 5 | root@m2: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-st m1:9092 --sync --topic idoall_testTopic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details. hello welcome idoall.org |
s1机器接收的消息:
?
1 2 3 4 5 | root@s1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginning SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details. hello welcome idoall.org |
#我们再在Eclipse中运行KafkaTopologytest.java,可以看到在控制台,同样收到了刚才在m2上kafka发送的消息。说明kafka和storm也打通了。
?
1 2 3 4 5 6 7 8 9 10 11 | #信息太多,我只截取重要部分: *********Results********topic:idoall_testTopic storm接收到来自kafka的消息------->hello welcome idoall.org 5268 [Thread-24-spout] INFO backtype.storm.daemon.task - Emitting: spout default [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051] 对消息加工第1次-------[arg0]:hello welcome idoall.orgbolt1---[arg1]:1---[arg2]:2014年08月19日 11:21:15 051------->hello welcome idoall.orgbolt1 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.executor - Processing received message source : spout:6, stream: default, id : {-2000523200413433507=6673316475127546409}, [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051] 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [hello welcome idoall.orgbolt1] 5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2000523200413433507 4983764025617316501] 5269 [Thread-20-bolt2] INFO backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: default, id : {-2000523200413433507=1852530874180384956}, [hello welcome idoall.orgbolt1] 对消息加工第2次---------->hello welcome idoall.orgbolt1bolt2 5270 [Thread-20-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [hello welcome idoall.orgbolt1bolt2, 1] |
3) flume、kafka、storm的整合
从上面两个例子我们可以看到,flume和kafka之前已经完成了通讯和部署,kafka和storm之间可以正常通讯,只差把storm的相关文件打包成jar部署到storm中即可完成三者的通讯。
Storm的安装、配置、部署,如果不了解,可以参考这篇文章《ubuntu12.04+storm0.9.2分布式集群的搭建》
#复制kafka相关的jar包到storm的lib里面。(因为在上面我们已经说过,kafka和storm的整合,主要是重写storm的spout,调用kafka的Consumer来接收消息并打印,所在需要用到这些jar包)
?
1 2 3 4 5 | root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/storm-0.9.2-incubating/lib root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/storm-0.9.2-incubating/lib root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/storm-0.9.2-incubating/lib root@m1: /home/hadoop # cp /home/hadoop/zookeeper-3.4.5/dist-maven/zookeeper-3.4.5.jar /home/hadoop/storm-0.9.2-incubating/lib root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/zkclient-0.3.jar /home/hadoop/storm-0.9.2-incubating/lib |
#在m1上启动storm nimbus
?
1 | root@m1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm nimbus & |
#在s1,s2上启动storm supervisor
?
1 | root@s1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm supervisor & |
#在m1上启动storm ui
?
1 | root@m1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm ui & |
#将Eclipse中的文件打包成jar复制到做任意目录,然后用storm来运行
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | root@m1: /home/hadoop/storm-0 .9.2-incubating # ll 总用量 25768 drwxr-xr-x 11 root root 4096 Aug 19 11:53 ./ drwxr-xr-x 46 hadoop hadoop 4096 Aug 17 15:06 ../ drwxr-xr-x 2 root root 4096 Aug 1 14:38 bin/ -rw-r--r-- 1 502 staff 34239 Jun 13 08:46 CHANGELOG.md drwxr-xr-x 2 root root 4096 Aug 2 12:31 conf/ -rw-r--r-- 1 502 staff 538 Mar 13 11:17 DISCLAIMER drwxr-xr-x 3 502 staff 4096 May 6 03:13 examples/ drwxr-xr-x 3 root root 4096 Aug 1 14:38 external/ -rw-r--r-- 1 root root 26252342 Aug 19 11:36 idoall.cloud.jar drwxr-xr-x 3 root root 4096 Aug 2 12:51 ldir/ drwxr-xr-x 2 root root 4096 Aug 19 11:53 lib/ -rw-r--r-- 1 502 staff 22822 Jun 12 04:07 LICENSE drwxr-xr-x 2 root root 4096 Aug 1 14:38 logback/ drwxr-xr-x 2 root root 4096 Aug 1 15:07 logs/ -rw-r--r-- 1 502 staff 981 Jun 11 01:10 NOTICE drwxr-xr-x 5 root root 4096 Aug 1 14:38 public/ -rw-r--r-- 1 502 staff 7445 Jun 10 02:24 README.markdown -rw-r--r-- 1 502 staff 17 Jun 17 00:22 RELEASE -rw-r--r-- 1 502 staff 3581 May 30 00:20 SECURITY.md root@m1: /home/hadoop/storm-0 .9.2-incubating # /home/hadoop/storm-0.9.2-incubating/bin/storm jar idoall.cloud.jar idoall.cloud.storm.KafkaTopologytest |
#在flume中发消息,在storm中看是否有接收到
在flume中发送的消息:
?
1 2 | root@m1: /home/hadoop # echo "flume->kafka->storm message" | nc localhost 5140 root@m1: /home/hadoop # |
storm中显示的内容:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #内容太多,只截取重要部分 storm接收到来自kafka的消息------->flume->kafka->storm message 174218 [Thread-16-spout] INFO backtype.storm.daemon.task - Emitting: spout default [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360] 174220 [Thread-10-bolt1] INFO backtype.storm.daemon.executor - Processing received message source : spout:6, stream: default, id : {-2345821945306343027=-7738131487327750388}, [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360] 对消息加工第1次-------[arg0]:flume->kafka->storm messagebolt1---[arg1]:1---[arg2]:2014年08月19日 12:06:39 360------->flume->kafka->storm messagebolt1 174221 [Thread-10-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [flume->kafka->storm messagebolt1] 174221 [Thread-10-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2345821945306343027 -2191137958679040397] 174222 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: __ack_ack, id : {}, [-2345821945306343027 -2191137958679040397] 174222 [Thread-12-bolt2] INFO backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: default, id : {-2345821945306343027=8433871885621516671}, [flume->kafka->storm messagebolt1] 对消息加工第2次---------->flume->kafka->storm messagebolt1bolt2 174223 [Thread-12-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [flume->kafka->storm messagebolt1bolt2, 1] 174223 [Thread-12-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 __ack_ack [-2345821945306343027 8433871885621516671] 174224 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source : bolt2:4, stream: __ack_ack, id : {}, [-2345821945306343027 8433871885621516671] 174228 [Thread-16-spout] INFO backtype.storm.daemon.task - Emitting: spout __ack_init [-2345821945306343027 -7738131487327750388 6] 174228 [Thread-20-__acker] INFO backtype.storm.daemon.executor - Processing received message source : spout:6, stream: __ack_init, id : {}, [-2345821945306343027 -7738131487327750388 6] 174228 [Thread-20-__acker] INFO backtype.storm.daemon.task - Emitting direct: 6; __acker __ack_ack [-2345821945306343027] |
通过以上实例,我们完成了flume、kafka、storm之间的通讯,结合之前介绍的《Flume1.5.0的安装、部署、简单应用(含分布式、与hadoop2.2.0、hbase0.96的案例)》和《Golang、Php、Python、Java基于Thrift0.9.1实现跨语言调用》.如果相互结合,相信在基于大数据实时计算,以及多语言之间的相互调用,能够解决你在项目中的大部分问题。希望最近一系列的文章能够对你有帮助。
---------------------------------------
博文作者:迦壹
博客地址:Flume+Kafka+Strom基于分布式环境的结合使用
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
---------------------------------------
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。