首页 > 代码库 > KafKa

KafKa

一、Kafaka 介绍

Apache Kafka 是分布式发布-订阅消息系统。 它最初由 LinkedIn 公司开发, 之后成为 Apache 项目的一部分。 Kafka 是一种快速、 可扩展的、 设计内在就是 分布式的, 分区的和可复制的提交日志服务 Kafka 是一个消息系统, 原本开发自 LinkedIn, 用作 LinkedIn 的活动流
( activity stream) 和运营数据处理管道( pipeline) 的基础。 后贡献给 apache 基金会, 成为 apache 的一个顶级项目。 
技术分享

技术分享

 

技术分享

补充:

1、Kafka是一个分布式发布-订阅消息系统。分布式,意味着Kafka
可以在集群中运行。
2、消息的发布者,称为生产者(Producers),负责发布消息。发布消息之前,
需要发布确定一个主题(topic)。
3、broker,消息的缓存代理,相当于消息的服务器。生产者发布
的消息缓存在broker中。
4、消息的订阅者,称为消费者(Consumers),负责订阅消费消息。它向broker
中获取订阅的消息进行消费。
5、Kafka集群的运行需要Zookeeper来协调服务,所以在运行Kafka
集群之前,必须保证Zookeeper集群能正常运行。
6、在企业的大数据开发中,
经常用Flume+Kafka+Spark Streaming(Storm)或者
Kafka+Spark Streaming+Redis的方式进行开发。

NoSQL:not only SQL(不仅仅是SQL),
常用NoSQL数据库:HBase、ES(Elastic Search)、Redis、MongDB

 

二、安装Kafka步骤

        1、安转zookeeper

        2、安装kafaka

        3、下载解压安装包kafka_2.10-0.9.0.1.tgz

        4、vi config/server.properties

        5、broker.id=0
            host.name=liuiwei3
           zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

        6、将kafka复制到其他节点 scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei2:$PWD                          scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei4:$PWD 

        7、在其他两个salve节点分别修改 

              broker.id=1
            host.name=liuiwei2
           zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

         8、启动kafka

             在三台机器上分别启动              ./bin/kafka-server-start.sh -daemon config/server.properties

         9、检验kafka是否安装成功

             在master节点(liuwei3)操作:创建一个名为 test 的主题

                bin/kafka-topics.sh --create --zookeeper liuwei3:2181,liuwei2:2181,liuwei4:2181 --replication-factor 1 --partitions 1 --topic test

               在一个终端上启动一个生产者  ./bin/kafka-console-producer.sh --broker-list liuwei3:9092,liuwei2:9092,liuwei4:9092 --topic test

              然后再键盘输入信息:hello   

              在另一个终端中启动消费者(liuwei2,liuwei4),进入交互客户端,命令如下: ./bin/kafka-console-consumer.sh --zookeeper liuwei2:2181 --topic test --from-beginning

             会在屏幕上显示相同的信息 hello

三、Kafka+Spark Streaming

object KafkaDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("KafkaDemo")
              .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))
    //创建主题集合
    val topicSet=Map(("tgtest2"->1))

    /**
      * 第一个参数:StreamingContext对象
      * 第二个参数:Zookeeper集群,注意端口:2181
      * 第三个参数:消费者Consumer所在小组
      * 第四个参数:运行的主题
      */
    //从Kafka中获取数据
    val lines=KafkaUtils.createStream(ssc,
    "tgmaster:2181,tgslave:2181","DefaultConsumerGroup",topicSet)
    val result=lines.flatMap(x=>{
      x._2.split(" ")
    }).map(word=>(word,1))
      .reduceByKey(_+_)

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

  

                        

KafKa