首页 > 代码库 > kafka+storm+hbase

kafka+storm+hbase

kafka+storm+hbase实现计算WordCount。

(1)表名:wc

(2)列族:result

(3)RowKey:word

(4)Field:count

 

1、解决:

1)第一步:首先准备kafkastormhbase相关jar包。依赖如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com</groupId>
  <artifactId>kafkaSpout</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
	<dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.99.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
       <dependency>

         <groupId>com.google.protobuf</groupId>

         <artifactId>protobuf-java</artifactId>

         <version>2.5.0</version>

        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>



			
			
			
			
			 
        
           <dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.7</version>
			<scope>system</scope>
			<systemPath>C:\Program Files\Java\jdk1.7.0_51\lib\tools.jar</systemPath>
		</dependency>     
        
    </dependencies>


    <repositories>
        <repository>
            <id>central</id>
            <url>http://repo1.maven.org/maven2/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>clojars</id>
            <url>https://clojars.org/repo/</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>scala-tools</id>
            <url>http://scala-tools.org/repo-releases</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <repository>
            <id>conjars</id>
            <url>http://conjars.org/repo/</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <encoding>UTF-8</encoding>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

 

(2)kafka发来的数据通过levelSplitbolt进行分割处理,然后再发送到下一个Bolt中。代码如下:

 

package com.kafka.spout;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class LevelSplit extends BaseBasicBolt {


    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String words = tuple.getString(0).toString();//the cow jumped over the moon
        String []va=words.split(" ");
        for(String word : va)
        {
        	collector.emit(new Values(word));
        }
		
    }

  
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

  

(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,然后发送到hbase(Bolt)中。代码如下:

 

package com.kafka.spout;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class LevelCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

	public void execute(Tuple tuple, BasicOutputCollector collector) {
		// TODO Auto-generated method stub
		String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        counts.put(word, count);

		for (Entry<String, Integer> e : counts.entrySet()) {
			//sum += e.getValue();
			System.out.println(e.getKey()
								+ "----------->" +e.getValue());
        }
        collector.emit(new Values(word, count));      
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		 declarer.declare(new Fields("word", "count"));
	}
}

  

(4)准备连接kafkahbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:

 

package com.kafka.spout;


import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Maps;

//import org.apache.storm.guava.collect.Maps;


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;



public class StormKafkaTopo {
    public static void main(String[] args) {
    	
    	
        BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd", "/storm", "kafkaspout");
        Config conf = new Config();   
        spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());    
        
        SimpleHBaseMapper mapper = new SimpleHBaseMapper();
        mapper.withColumnFamily("result");
        mapper.withColumnFields(new Fields("count"));
        mapper.withRowKeyField("word");
        
        Map<String, Object> map = Maps.newTreeMap();
        map.put("hbase.rootdir", "hdfs://zeb:9000/hbase");
        map.put("hbase.zookeeper.quorum", "zeb:2181,yjd:2181,ylh:2181");
        
        // hbase-bolt
        HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");

        conf.setDebug(true);
        conf.put("hbase.conf", map);

        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new KafkaSpout(spoutConfig));
        builder.setBolt("split", new LevelSplit(), 1).shuffleGrouping("spout");
        builder.setBolt("count", new LevelCount(), 1).fieldsGrouping("split", new Fields("word"));
        builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");
        
        if(args != null && args.length > 0) {
            //提交到集群运行
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            //本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Topotest1121", conf, builder.createTopology());
            Utils.sleep(1000000);
            cluster.killTopology("Topotest1121");
            cluster.shutdown();
        }           
    }
}

  

(5)在kafka端用控制台生产数据,如下:

 

技术分享

 

2、运行结果截图:

 技术分享

 

3、遇到的问题:

(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,如下:

 技术分享

 

解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。

(2)发生了错误2,如下:

 技术分享

 

解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。

kafka+storm+hbase