首页 > 代码库 > flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一、Hadoop配置安装

    

    

注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库,

所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译


1.修改Linux主机名

2.修改IP

3.修改主机名和IP的映射关系

######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机、阿里云主机等)

/etc/hosts里面要配置的是内网IP地址和主机名的映射关系

4.关闭防火墙

5.ssh免登陆

6.安装JDK,配置环境变量等



集群规划:

主机名IP安装的软件              运行的进程

hadoop01192.168.1.201jdk、hadoopNameNode、DFSZKFailoverController


(zkfc)

hadoop02192.168.1.202jdk、hadoopNameNode、DFSZKFailoverController


(zkfc)

hadoop03192.168.1.203jdk、hadoopResourceManager

hadoop04192.168.1.204jdk、hadoopResourceManager

hadoop05192.168.1.205jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、


QuorumPeerMain

hadoop06192.168.1.206jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、


QuorumPeerMain

hadoop07192.168.1.207jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、


QuorumPeerMain

说明:

1.在hadoop2.0中通常由两个NameNode组成,一个处于active状态,另一个处于standby状态。Active NameNode对外提供服务,而Standby 


NameNode则不对外提供服务,仅同步active namenode的状态,以便能够在它失败时快速进行切换。

hadoop2.0官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。

这里我们使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成


功。通常配置奇数个JournalNode

这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当Active NameNode挂掉了,会自动切换Standby 


NameNode为standby状态

2.hadoop-2.2.0中依然存在一个问题,就是ResourceManager只有一个,存在单点故障,hadoop-2.4.1解决了这个问题,有两个


ResourceManager,一个是Active,一个是Standby,状态由zookeeper进行协调

安装步骤:

1.安装配置zooekeeper集群(在hadoop05上)

1.1解压

tar -zxvf zookeeper-3.4.5.tar.gz -C /hadoop/

1.2修改配置

cd /hadoop/zookeeper-3.4.5/conf/

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg

修改:dataDir=/hadoop/zookeeper-3.4.5/tmp

在最后添加:

server.1=hadoop05:2888:3888

server.2=hadoop06:2888:3888

server.3=hadoop07:2888:3888

保存退出

然后创建一个tmp文件夹

mkdir /hadoop/zookeeper-3.4.5/tmp

再创建一个空文件

touch /hadoop/zookeeper-3.4.5/tmp/myid

最后向该文件写入ID

echo 1 > /hadoop/zookeeper-3.4.5/tmp/myid

1.3将配置好的zookeeper拷贝到其他节点(首先分别在hadoop06、hadoop07根目录下创建一个hadoop目录:mkdir /hadoop)

scp -r /hadoop/zookeeper-3.4.5/ hadoop06:/hadoop/

scp -r /hadoop/zookeeper-3.4.5/ hadoop07:/hadoop/

注意:修改hadoop06、hadoop07对应/hadoop/zookeeper-3.4.5/tmp/myid内容

hadoop06:

echo 2 > /hadoop/zookeeper-3.4.5/tmp/myid

hadoop07:

echo 3 > /hadoop/zookeeper-3.4.5/tmp/myid

2.安装配置hadoop集群(在hadoop01上操作)

2.1解压

tar -zxvf hadoop-2.4.1.tar.gz -C /hadoop/

2.2配置HDFS(hadoop2.0所有的配置文件都在$HADOOP_HOME/etc/hadoop目录下)

#将hadoop添加到环境变量中

vim /etc/profile

export JAVA_HOME=/usr/java/jdk1.7.0_55

export HADOOP_HOME=/hadoop/hadoop-2.4.1

export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin

#hadoop2.0的配置文件全部在$HADOOP_HOME/etc/hadoop下

cd /hadoop/hadoop-2.6.0/etc/hadoop

2.2.1修改hadoop-env.sh

export JAVA_HOME=/usr/java/jdk1.7.0_55

2.2.2修改core-site.xml

<configuration>
<!-- 指定hdfs的nameservice为wxm -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://wxm</value>
</property>
<!-- 指定hadoop临时目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/wxm/software/hadoop/hadoop-2.6.5/tmp</value>
</property>
<!-- 指定zookeeper地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
</configuration>

2.2.3修改hdfs-site.xml

<configuration>
<!--指定hdfs的nameservice为wxm,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>wxm</value>
</property>
<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.wxm</name>
<value>hadoop05,hadoop06</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.wxm.hadoop05</name>
<value>hadoop05:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.wxm.hadoop051</name>
<value>hadoop05:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.wxm.hadoop06</name>
<value>hadoop06:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.wxm.hadoop06</name>
<value>hadoop06:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop05:8485;hadoop06:8485/wxm</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/wxm/software/hadoop/hadoop-2.6.5/journal</value>
</property>
<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.wxm</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>
<!-- 使用sshfence隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>300000</value>
</property>
</configuration>


2.2.4修改mapred-site.xml

<configuration>
<!-- 指定mr框架为yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>


2.2.5修改yarn-site.xml

<configuration>
<!-- 开启RM高可靠 -->
<property>
   <name>yarn.resourcemanager.ha.enabled</name>
   <value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
   <name>yarn.resourcemanager.cluster-id</name>
   <value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
   <name>yarn.resourcemanager.ha.rm-ids</name>
   <value>resourcemanagerAtWorker1,resourcemanagerAtWorker1</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
   <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker1</name>
   <value>worker1</value>
</property>
<property>
   <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker2</name>
   <value>worker2</value>
</property>
<!-- 指定zk集群地址 -->
<property>
   <name>yarn.resourcemanager.zk-address</name>
   <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
</configuration>


2.2.6修改slaves(slaves是指定子节点的位置,因为要在hadoop01上启动HDFS、在hadoop03启动yarn,所以hadoop01上的


slaves文件指定的是datanode的位置,hadoop03上的slaves文件指定的是nodemanager的位置)

hadoop05

hadoop06

hadoop07


2.2.7配置免密码登陆

#首先要配置hadoop01到hadoop02、hadoop03、hadoop04、hadoop05、hadoop06、hadoop07的免密码登


#在hadoop01上生产一对钥匙

ssh-keygen -t rsa

#将公钥拷贝到其他节点,包括自己

ssh-coyp-id hadoop01

ssh-coyp-id hadoop02

ssh-coyp-id hadoop03

ssh-coyp-id hadoop04

ssh-coyp-id hadoop05

ssh-coyp-id hadoop06

ssh-coyp-id hadoop07

#配置hadoop03到hadoop04、hadoop05、hadoop06、hadoop07的免密码登陆

#在hadoop03上生产一对钥匙

ssh-keygen -t rsa

#将公钥拷贝到其他节点

ssh-coyp-id hadoop04

ssh-coyp-id hadoop05

ssh-coyp-id hadoop06

ssh-coyp-id hadoop07

#注意:两个namenode之间要配置ssh免密码登陆,别忘了配置hadoop02到hadoop01的免登陆

在hadoop02上生产一对钥匙

ssh-keygen -t rsa

ssh-coyp-id -i hadoop01

2.4将配置好的hadoop拷贝到其他节点

scp -r /hadoop/ hadoop02:/

scp -r /hadoop/ hadoop03:/

scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop04:/hadoop/

scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop05:/hadoop/

scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop06:/hadoop/

scp -r /hadoop/hadoop-2.4.1/ hadoop@hadoop07:/hadoop/

###注意:严格按照下面的步骤

2.5启动zookeeper集群(分别在hadoop05、hadoop06、hadoop07上启动zk)

cd /hadoop/zookeeper-3.4.5/bin/

./zkServer.sh start

#查看状态:一个leader,两个follower

./zkServer.sh status

2.6启动journalnode(分别在在hadoop05、hadoop06、tcast07上执行)

cd /hadoop/hadoop-2.4.1

sbin/hadoop-daemon.sh start journalnode

#运行jps命令检验,hadoop05、hadoop06、hadoop07上多了JournalNode进程

2.7格式化HDFS

#在hadoop01上执行命令:

hdfs namenode -format

#格式化后会在根据core-site.xml中的hadoop.tmp.dir配置生成个文件,这里我配置的是/hadoop/hadoop-2.4.1/tmp,然后


将/hadoop/hadoop-2.4.1/tmp拷贝到hadoop02的/hadoop/hadoop-2.4.1/下。

scp -r tmp/ hadoop02:/hadoop/hadoop-2.4.1/

2.8格式化ZK(在hadoop01上执行即可)

hdfs zkfc -formatZK

2.9启动HDFS(在hadoop01上执行)

sbin/start-dfs.sh


                2.11启动zkfc

hadoop-daemon.sh start zkfc


2.10启动YARN(#####注意#####:是在hadoop03上执行start-yarn.sh,把namenode和resourcemanager分开是因为性能问题,因为


他们都要占用大量资源,所以把他们分开了,

                      他们分开了就要分别在不同的机器上启动)

sbin/start-yarn.sh


到此,hadoop-2.4.1配置完毕,可以统计浏览器访问:

http://192.168.1.201:50070

NameNode ‘hadoop01:9000‘ (active)

http://192.168.1.202:50070

NameNode ‘hadoop02:9000‘ (standby)

验证HDFS HA

首先向hdfs上传一个文件

hadoop fs -put /etc/profile /profile

hadoop fs -ls /

然后再kill掉active的NameNode

kill -9 <pid of NN>

通过浏览器访问:http://192.168.1.202:50070

NameNode ‘hadoop02:9000‘ (active)

这个时候hadoop02上的NameNode变成了active

在执行命令:

hadoop fs -ls /

-rw-r--r--   3 root supergroup       1926 2014-02-06 15:36 /profile

刚才上传的文件依然存在!!!

手动启动那个挂掉的NameNode

sbin/hadoop-daemon.sh start namenode

通过浏览器访问:http://192.168.1.201:50070

NameNode ‘hadoop01:9000‘ (standby)

验证YARN:

运行一下hadoop提供的demo中的WordCount程序:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /profile /out

OK,大功告成!!!


二、spark安装

    standalone模式搭建比较简单  参考官网即可

三、kafka集群搭建

    1.  kafka2.11下载并解压


    2.  修改配置文件


    · config/server.properties
        broker.id=4(集群里的id不能重复,我是取每台机器IP最后一位)
        listeners=PLAINTEXT://192.168.248.134:9092(格式不变,绑定本机IP)
        log.dirs=/home/hadoop/kafka/logs4kafka(日志路径)
        zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
最后一项时延可以增大一些,曾经因为超过时延而报错,修改成10倍就OK了
     ·producer.properties
        bootstrap.servers=h2:9092,h3:9092,h4:9092,h8:9092,h9:9092,h10:9092
     ·consumer.properties
        zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
        group不配的话就是默认,看需求
    3.拷贝到其他节点,注意修改listeners绑定的IP和broker.id

    

    4 基本命令

    #start
    bin/kafka-server-start.sh config/server.properties


    #create a topic
    bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --replication-factor 5 --partition 5 --topic T20161021

    #list all topics
    bin/kafka-topics.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --list

    #describe the detail of this topic
    bin/kafka-topics.sh --describe --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181  --topic test101

    #write some Messages to the topic

    bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161021

    #recerive the message producer writed
    bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --topic T20161021 --from-beginning


四、flume-ng 1.6安装配置


    1.下载安装并配置好flume的运行环境

    2.编写配置文件


    # ------------------- 定义数据流----------------------
    agent.sources = kafkaSource
    agent.channels = memoryChannel
    agent.sinks = hdfsSink
    agent.sources.kafkaSource.channels = memoryChannel
    agent.sinks.hdfsSink.channel = memoryChannel

    #-------- kafkaSource相关配置-----------------
    agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
    agent.sources.kafkaSource.topic = T20161031
    #agent.sources.kafkaSource.groupId = flume
    agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000

    #------- memoryChannel相关配置-------------------------
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity=10000
    agent.channels.memoryChannel.transactionCapacity=1000

    #---------hdfsSink 相关配置------------------
    agent.sinks.hdfsSink.type = hdfs
    agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
    agent.sinks.hdfsSink.hdfs.writeFormat = Text
    agent.sinks.hdfsSink.hdfs.fileType = DataStream

    3.启动Flume
    bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console


五、sparkStreaming1.6.2

 

   
import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.*;import scala.Tuple2;import java.util.Arrays;public class StreamingOnHDFS {public static void main(String[] args){final SparkConf conf = newSparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {@Overridepublic JavaStreamingContext create() {return createContext(checkpointDirectory, conf) ;}} ;JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String line) throws Exception {return Arrays. asList(line.split(" ")) ;}}) ;JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1) ;}}) ;JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,Integer>() {public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2;}}) ;wordscount.print();jsc.start() ;jsc.awaitTermination() ;jsc.close() ;}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){System. out.println("==========Creating new context==============") ;JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;ssc.checkpoint(checkpointDirectory) ;return ssc;}}

六、测试

    1.将程序打成runnable jar并上传

    2.启动zookeeper、hadoop、spark、kafka、flume

    3.进入spark安装目录下执行

    bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar


七、扩展

   数据导入HDFS之后即可进行基于MR/spark的批处理或准实时的流式处理,可作海量数据的清洗工作或预处理,结果可存入Hbase、redis或RDBMS以做进一步的展示或挖掘

本文出自 “10891776” 博客,请务必保留此出处http://10901776.blog.51cto.com/10891776/1874924

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统