首页 > 代码库 > Hadoop日记Day20---ZooKeeper系列(五)

Hadoop日记Day20---ZooKeeper系列(五)

分布式服务框架 Zookeeper -- 管理分布式环境中的数据

引言

本节本来是要介绍ZooKeeper的实现原理,但是ZooKeeper的原理比较复杂,它涉及到了paxos算法、Zab协议、通信协议等相关知识,理解起来比较抽象所以还需要借助一些应用场景,来帮我们理解。由于内容比较多,一口气吃不成胖子,得慢慢来一步一个脚印,因此我对后期ZooKeeper的学习规划如下:

第一阶段:

|---理解ZooKeeper的应用

    |---ZooKeeper是什么

    |---ZooKeeper能干什么

    |---ZooKeeper 怎么使用

第二阶段:

|---理解ZooKeeper原理准备

    |---了解paxos

    |---理解 zab原理

    |---理解选举/同步流程

第三阶段:

    |---深入ZooKeeper原理

        |---分析源码

        |---尝试开发分布式应用

由于内容较多,而且理解较为复杂,所以每个阶段分开来学习和介绍,那么本文主要介绍的的是第一阶段,该阶段一般应该放在前面介绍,但感觉像一些ZooKeeper应用案例,如果没有一定的ZooKeeper基础,理解起来也比较抽象, 所以放在这介绍。大家可以对比一下前面的应用程序,来对比理解一下前面的那些应用到底用到ZooKeeper的那些功能,来进一步理解ZooKeeper的实现理念,由于网上关于这方面的介绍比较多,如果一些可爱的博友对该内容已经比较了解,那么您可以不用往下看了,继续下一步学习。

一、ZooKeeper产生背景

1.1 分布式的发展

分布式这个概念我想大家并不陌生,但真正实战开始还要从google说起,很早以前在实验室中分布式被人提出,可是说是计算机内入行较为复杂学习较为困难的技术,并且市场也并不成熟,因此大规模的商业应用一直未成出现,但从Google 发布了MapReduceDFS 以及Bigtable的论文之后,分布式在计算机界的格局就发生了变化,从架构上实现了分布式的难题,并且成熟的应用在了海量数据存储计算上,其集群的规模也是当前世界上最为庞大的。

以DFS 为基础的分布式计算框架keyvalue 数据高效的解决运算的瓶颈,而且开发人员不用再写复杂的分布式程序,只要底层框架完备开发人员只要用较少的代码就可以完成分布式程序的开发,这使得开发人员只需要关注业务逻辑的即可。Google 在业界技术上的领军地位,让业界望尘莫及的技术实力,IT 因此也是对Google 所退出的技术十分推崇。在最近几年中分布式则是成为了海量数据存储以及计算、高并发、高可靠性、高可用性的解决方案。

1.2 ZooKeeper的产生

众所周知通常分布式架构都是中心化的设计,就是一个主控机连接多个处理节点。问题可以从这里考虑,当主控机失效时,整个系统则就无法访问了,所以保证系统的高可用性是非常关键之处,也就是要保证主控机的高可用性。分布式锁就是一个解决该问题的较好方案,多主控机抢一把锁。在这里我们就涉及到了我们的重点Zookeeper。

ZooKeeper是什么,chubby 我想大家都不会陌生的,chubby 是实现Google 的一个分布式锁的实现,运用到了paxos 算法解决的一个分布式事务管理的系统。Zookeeper 就是雅虎模仿强大的Google chubby 实现的一套分布式锁管理系统。同时,Zookeeper 分布式服务框架是Apache Hadoop的一个子项目,它是一个针对大型分布式系统的可靠协调系统,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,可以高可靠的维护元数据。提供的功能包括:配置维护、名字服务、分布式同步、组服务等。ZooKeeper的设计目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

1.3 ZooKeeper的使用

Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理,后面将 会详细介绍 Zookeeper 能够解决的一些典型问题。

注意一下这里的"数据"是有限制的:

(1) 从数据大小来看:我们知道ZooKeeper的数据存储在一个叫ReplicatedDataBase 的数据库中,该数据是一个内存数据库,既然是在内存当中,我就应该知道该数据量就应该不会太大,这一点上就与hadoop的HDFS有了很大的区别,HDFS的数据主要存储在磁盘上,因此数据存储主要是HDFS的事,而ZooKeeper主要是协调功能,并不是用来存储数据的。

(2) 从数据类型来看:正如前面所说的,ZooKeeper的数据在内存中,由于内存空间的限制,那么我们就不能在上面随心所欲的存储数据,所以ZooKeeper存储的数据都是我们所关心的数据而且数据量还不能太大,而且还会根据我们要以实现的功能来选择相应的数据。简单来说,干什么事存什么数据,ZooKeeper所实现的一切功能,都是由ZK节点的性质该节点所关联的数据实现的,至于关联什么数据那就要看你干什么事了。

例如:

  ① 集群管理:利用临时节点特性,节点关联的是机器的主机名、IP地址等相关信息,集群单点故障也属于该范畴。

  ② 统一命名:主要利用节点的唯一性和目录节点树结构。

  ③ 配置管理:节点关联的是配置信息。

  ④ 分布式锁:节点关联的是要竞争的资源。

二、ZooKeeper应用场景

ZooKeeper是一个高可用的分布式数据管理与系统协调框架。基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得zookeeper能够应用于很多场景。需要注意的是,ZK并不是生来就为这些场景设计,都是后来众多开发者根据框架的特性,摸索出来的典型使用方法。因此,我们也可以根据自己的需要来设计相应的场景实现。正如前文所提到的,ZooKeeper 实现的任何功能都离不开ZooKeeper的数据结构,任何功能的实现都是利用"Znode结构特性+节点关联的数据"来实现的,好吧那么我们就看一下ZooKeeper数据结构有哪些特性。ZooKeeper数据结构如下图所示:

图2.1 ZooKeeper数据结构

Zookeeper 这种数据结构有如下这些特点:

每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1

znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录;

znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据

znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了;

znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2;

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的。

2.1数据发布与订阅

(1) 典型场景描述

发布与订阅即所谓的配置管理,顾名思义就是将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息地址列表等就非常适合使用。集中式的配置管理在应用集群中是非常常见的,一般商业公司内部都会实现一套集中的配置管理中心,应对不同的应用集群对于共享各自配置的需求,并且在配置变更时能够通知到集群中的每一个机器。

(2) 应用

索引信息和集群中机器节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。

系统日志(经过处理后的)存储,这些日志通常2-3天后被清除。

应用中用到的一些配置信息集中管理,在应用启动的时候主动来获取一次,并且在节点上注册一个Watcher,以后每次配置有更新,实时通知到应用,获取最新配置信息。

业务逻辑中需要用到的一些全局变量,比如一些消息中间件的消息队列通常有个offset,这个offset存放在zk上,这样集群中每个发送者都能知道当前的发送进度

系统中有些信息需要动态获取,并且还会存在人工手动去修改这个信息。以前通常是暴露出接口,例如JMX接口,有了ZK后,只要将这些信息存放到ZK节点上即可。

(3) 应用举例

例如:同一个应用系统需要多台 PC Server 运行,但是它们运行的应用系统的某些配置项是相同的,如果要修改这些相同的配置项,那么就必须同时修改每台运行这个应用系统的 PC Server,这样非常麻烦而且容易出错。将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中。ZooKeeper配置管理服务如下图所示:

图2.2 配置管理结构图

Zookeeper很容易实现这种集中式的配置管理,比如将所需要的配置信息放到/Configuration 节点上,集群中所有机器一启动就会通过Client/Configuration这个节点进行监控【zk.exist("/Configuration″,true)】,并且实现Watcher回调方法process(),那么在zookeeper上/Configuration节点下数据发生变化的时候,每个机器都会收到通知,Watcher回调方法将会被执行,那么应用再取下数据即可【zk.getData("/Configuration″,false,null)】。

2.2统一命名服务(Name Service)

(1) 场景描述

分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住,通常情况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了 JNDI,没错 Zookeeper 的 Name Service 与 JNDI 能够完成的功能是差不多的,它们都是将有层次的目录结构关联到一定资源上,但是Zookeeper的Name Service 更加是广泛意义上的关联,也许你并不需要将名称关联到特定资源上,你可能只需要一个不会重复名称,就像数据库中产生一个唯一的数字主键一样。

(2) 应用

在分布式系统中,通过使用命名服务,客户端应用能够根据指定的名字来获取资源服务的地址提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址进程对象等等,这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。Name Service 已经是Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就可以很容易创建一个目录节点。

(3) 应用举例

阿里开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表。在Dubbo实现中: 服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。 服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。 注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。

2.3分布通知/协调(Distribution of notification/coordination)

(1) 典型场景描述

ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理。

(2) 应用

另一种心跳检测机制检测系统被检测系统之间并不直接关联起来,而是通过ZK上某个节点关联,大大减少系统耦合。

另一种系统调度模式:某系统由控制台推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。

另一种工作汇报模式:一些类似于任务分发系统子任务启动后,到ZK来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合。

2.4分布式锁(Distribute Lock)

(1) 场景描述

分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性,即用户只要完全相信每时每刻,zk集群中任意节点(一个zk server)上的相同znode的数据是一定是相同的。锁服务可以分为两类,一个是保持独占,另一个是控制时序。

保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把ZK上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。

控制时序,就是所有试图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distribute_lock 已经预先存在,客户端在它下面创建临时有序节点。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。

(2) 应用

共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

图 2.3 ZooKeeper实现Locks的流程图

代码清单1 TestMainClient 代码

  1. package org.zk.leader.election;
  2.  
  3. import org.apache.log4j.xml.DOMConfigurator;
  4. import org.apache.zookeeper.WatchedEvent;
  5. import org.apache.zookeeper.Watcher;
  6. import org.apache.zookeeper.ZooKeeper;
  7.  
  8. import java.io.IOException;
  9.  
  10. /**
  11.  * TestMainClient
  12.  * <p/>
  13.  * Author By: sunddenly工作室
  14.  * Created Date: 2014-11-13
  15.  */
  16. public class TestMainClient implements Watcher {
  17.     protected static ZooKeeper zk = null;
  18.     protected static Integer mutex;
  19.     int sessionTimeout = 10000;
  20.     protected String root;
  21.     public TestMainClient(String connectString) {
  22.         if(zk == null){
  23.             try {
  24.  
  25.                 String configFile = this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml";
  26.                 DOMConfigurator.configure(configFile);
  27.                 System.out.println("创建一个新的连接:");
  28.                 zk = new ZooKeeper(connectString, sessionTimeout, this);
  29.                 mutex = new Integer(-1);
  30.             } catch (IOException e) {
  31.                 zk = null;
  32.             }
  33.         }
  34.     }
  35.    synchronized public void process(WatchedEvent event) {
  36.         synchronized (mutex) {
  37.             mutex.notify();
  38.         }
  39.     }
  40. }

清单 2 Locks 代码

  1. package org.zk.locks;
  2.  
  3. import org.apache.log4j.Logger;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.ZooDefs;
  8. import org.apache.zookeeper.data.Stat;
  9. import org.zk.leader.election.TestMainClient;
  10.  
  11. import java.util.Arrays;
  12. import java.util.List;
  13.  
  14. /**
  15.  * locks
  16.  * <p/>
  17.  * Author By: sunddenly工作室
  18.  * Created Date: 2014-11-13 16:49:40
  19.  */
  20. public class Locks extends TestMainClient {
  21.     public static final Logger logger = Logger.getLogger(Locks.class);
  22.     String myZnode;
  23.  
  24.     public Locks(String connectString, String root) {
  25.         super(connectString);
  26.         this.root = root;
  27.         if (zk != null) {
  28.             try {
  29.                 Stat s = zk.exists(root, false);
  30.                 if (s == null) {
  31.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  32.                 }
  33.             } catch (KeeperException e) {
  34.                 logger.error(e);
  35.             } catch (InterruptedException e) {
  36.                 logger.error(e);
  37.             }
  38.         }
  39.     }
  40.     void getLock() throws KeeperException, InterruptedException{
  41.         List<String> list = zk.getChildren(root, false);
  42.         String[] nodes = list.toArray(new String[list.size()]);
  43.         Arrays.sort(nodes);
  44.         if(myZnode.equals(root+"/"+nodes[0])){
  45.             doAction();
  46.         }
  47.         else{
  48.             waitForLock(nodes[0]);
  49.         }
  50.     }
  51.     void check() throws InterruptedException, KeeperException {
  52.         myZnode = zk.create(root + "/lock_" , new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  53.         getLock();
  54.     }
  55.     void waitForLock(String lower) throws InterruptedException, KeeperException {
  56.         Stat stat = zk.exists(root + "/" + lower,true);
  57.         if(stat != null){
  58.             mutex.wait();
  59.         }
  60.         else{
  61.             getLock();
  62.         }
  63.     }
  64.     @Override
  65.     public void process(WatchedEvent event) {
  66.         if(event.getType() == Event.EventType.NodeDeleted){
  67.             System.out.println("得到通知");
  68.             super.process(event);
  69.             doAction();
  70.         }
  71.     }
  72.     /**
  73.      * 执行其他任务
  74.      */
  75.     private void doAction(){
  76.         System.out.println("同步队列已经得到同步,可以开始执行后面的任务了");
  77.     }
  78.  
  79.     public static void main(String[] args) {
  80.         String connectString = "localhost:2181";
  81.  
  82.         Locks lk = new Locks(connectString, "/locks");
  83.         try {
  84.             lk.check();
  85.         } catch (InterruptedException e) {
  86.             logger.error(e);
  87.         } catch (KeeperException e) {
  88.             logger.error(e);
  89.         }
  90.     }
  91. }

2.5 集群管理(Cluster Management)

(1) 典型场景描述

集群机器监控

这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报"我还活着"。 这种做法可行,但是存在两个比较明显的问题:

集群中机器有变动的时候,牵连修改的东西比较多。

有一定的延时。

利用ZooKeeper中两个特性,就可以实施另一种集群机器存活性监控系统:

客户端在节点 x 上注册一个Watcher,那么如果 x 的子节点变化了,会通知该客户端。

创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失。

Master选举:

Master选举则是zookeeper中最为经典的使用场景了,在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑,例如一些耗时的计算,网络I/O处,往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。

利用ZooKeeper中两个特性,就可以实施另一种集群中Master选举:

利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /Master 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选举了。

另外,这种场景演化一下,就是动态Master选举。这就要用到 EPHEMERAL_SEQUENTIAL类型节点的特性了,这样每个节点会自动被编号。允许所有请求都能够创建成功,但是得有个创建顺序,每次选取序列号最小的那个机器作为Master 。

(2) 应用

在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此间索引数据一致。因此让集群中的Master来迚行全量索引的生成,然后同步到集群中其它机器。另外,Master选丼的容灾措施是,可以随时迚行手动挃定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master。 ? 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选丼出一个HMaster来运行,从而避免了HMaster的单点问题的存活状态,同时,一旦HMaster出现问题,会重新选丼出一个HMaster来运行,从而避免了HMaster的单点问题。

(3) 应用举例

集群监控:

应用集群中,我们常常需要让每一个机器知道集群中或依赖的其他某一个集群中哪些机器是活着的,并且在集群机器因为宕机,网络断链等原因能够不在人工介入的情况下迅速通知到每一个机器,Zookeeper 能够很容易的实现集群管理的功能,如有多台 Server 组成一个服务集群,那么必须要一个"总管"知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让"总管"知道,这就是ZooKeeper的集群监控功能。

图2.4 集群管理结构图

比如我在zookeeper服务器端有一个znode叫/Configuration,那么集群中每一个机器启动的时候都去这个节点下创建一个EPHEMERAL类型的节点,比如server1创建/Configuration /Server1,server2创建/Configuration /Server1,然后Server1和Server2都watch /Configuration 这个父节点,那么也就是这个父节点下数据或者子节点变化都会通知对该节点进行watch的客户端。因为EPHEMERAL类型节点有一个很重要的特性,就是客户端和服务器端连接断掉或者session过期就会使节点消失,那么在某一个机器挂掉或者断链的时候,其对应的节点就会消 失,然后集群中所有对/Configuration进行watch的客户端都会收到通知,然后取得最新列表即可。

Master选举:

Zookeeper 不仅能够维护当前的集群中机器的服务状态,而且能够选出一个"总管",让这个总管来管理集群,这就是 Zookeeper 的另一个功能 Leader Election。Zookeeper 如何实现 Leader Election,也就是选出一个 Master Server。和前面的一样每台 Server 创建一个 EPHEMERAL 目录节点,不同的是它还是一个 SEQUENTIAL 目录节点,所以它是个 EPHEMERAL_SEQUENTIAL 目录节点。之所以它是 EPHEMERAL_SEQUENTIAL 目录节点,是因为我们可以给每台 Server 编号,我们可以选择当前是最小编号的 Server 为 Master,假如这个最小编号的 Server 死去,由于是 EPHEMERAL 节点,死去的 Server 对应的节点也被删除,所以当前的节点列表中又出现一个最小编号的节点,我们就选择这个节点为当前 Master。这样就实现了动态选择 Master,避免了传统意义上单 Master 容易出现单点故障的问题

清单 3 Leader Election代码

  1. package org.zk.leader.election;
  2.  
  3. import org.apache.log4j.Logger;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.ZooDefs;
  8. import org.apache.zookeeper.data.Stat;
  9.  
  10. import java.net.InetAddress;
  11. import java.net.UnknownHostException;
  12.  
  13. /**
  14.  * LeaderElection
  15.  * <p/>
  16.  * Author By: sunddenly工作室
  17.  * Created Date: 2014-11-13
  18.  */
  19. public class LeaderElection extends TestMainClient {
  20.     public static final Logger logger = Logger.getLogger(LeaderElection.class);
  21.  
  22.     public LeaderElection(String connectString, String root) {
  23.         super(connectString);
  24.         this.root = root;
  25.         if (zk != null) {
  26.             try {
  27.                 Stat s = zk.exists(root, false);
  28.                 if (s == null) {
  29.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  30.                 }
  31.             } catch (KeeperException e) {
  32.                 logger.error(e);
  33.             } catch (InterruptedException e) {
  34.                 logger.error(e);
  35.             }
  36.         }
  37.     }
  38.  
  39.     void findLeader() throws InterruptedException, UnknownHostException, KeeperException {
  40.         byte[] leader = null;
  41.         try {
  42.             leader = zk.getData(root + "/leader", true, null);
  43.         } catch (KeeperException e) {
  44.             if (e instanceof KeeperException.NoNodeException) {
  45.                 logger.error(e);
  46.             } else {
  47.                 throw e;
  48.             }
  49.         }
  50.         if (leader != null) {
  51.             following();
  52.         } else {
  53.             String newLeader = null;
  54.             byte[] localhost = InetAddress.getLocalHost().getAddress();
  55.             try {
  56.                 newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  57.             } catch (KeeperException e) {
  58.                 if (e instanceof KeeperException.NodeExistsException) {
  59.                     logger.error(e);
  60.                 } else {
  61.                     throw e;
  62.                 }
  63.             }
  64.             if (newLeader != null) {
  65.                 leading();
  66.             } else {
  67.                 mutex.wait();
  68.             }
  69.         }
  70.     }
  71.  
  72.     @Override
  73.     public void process(WatchedEvent event) {
  74.         if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) {
  75.             System.out.println("得到通知");
  76.             super.process(event);
  77.             following();
  78.         }
  79.     }
  80.  
  81.     void leading() {
  82.         System.out.println("成为领导者");
  83.     }
  84.  
  85.     void following() {
  86.         System.out.println("成为组成员");
  87.     }
  88.  
  89.     public static void main(String[] args) {
  90.         String connectString = "localhost:2181";
  91.  
  92.         LeaderElection le = new LeaderElection(connectString, "/GroupMembers");
  93.         try {
  94.             le.findLeader();
  95.         } catch (Exception e) {
  96.             logger.error(e);
  97.         }
  98.     }
  99. }

2.6 队列管理

Zookeeper 可以处理两种类型的队列:

当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列

队列按照 FIFO 方式进行入队和出队操作,例如实现生产者消费者模型

(1) 同步队列用 Zookeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start。

用下面的流程图更容易理解:

图 2.5 同步队列流程图

 

清单 4 Synchronizing 代码

  1. package org.zk.queue;
  2.  
  3. import java.net.InetAddress;
  4. import java.net.UnknownHostException;
  5. import java.util.List;
  6.  
  7. import org.apache.log4j.Logger;
  8. import org.apache.zookeeper.CreateMode;
  9. import org.apache.zookeeper.KeeperException;
  10. import org.apache.zookeeper.WatchedEvent;
  11. import org.apache.zookeeper.Watcher;
  12. import org.apache.zookeeper.ZooKeeper;
  13. import org.apache.zookeeper.ZooDefs.Ids;
  14. import org.apache.zookeeper.data.Stat;
  15. import org.zk.leader.election.TestMainClient;
  16.  
  17. /**
  18.  * Synchronizing
  19.  * <p/>
  20.  * Author By: sunddenly工作室
  21.  * Created Date: 2014-11-13
  22.  */
  23. public class Synchronizing extends TestMainClient {
  24.     int size;
  25.     String name;
  26.     public static final Logger logger = Logger.getLogger(Synchronizing.class);
  27.  
  28.     /**
  29.      * 构造函数
  30.      *
  31.      * @param connectString 服务器连接
  32.      * @param root 根目录
  33.      * @param size 队列大小
  34.      */
  35.     Synchronizing(String connectString, String root, int size) {
  36.         super(connectString);
  37.         this.root = root;
  38.         this.size = size;
  39.  
  40.         if (zk != null) {
  41.             try {
  42.                 Stat s = zk.exists(root, false);
  43.                 if (s == null) {
  44.                     zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  45.                 }
  46.             } catch (KeeperException e) {
  47.                 logger.error(e);
  48.             } catch (InterruptedException e) {
  49.                 logger.error(e);
  50.             }
  51.         }
  52.         try {
  53.             name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
  54.         } catch (UnknownHostException e) {
  55.             logger.error(e);
  56.         }
  57.  
  58.     }
  59.  
  60.     /**
  61.      * 加入队列
  62.      *
  63.      * @return
  64.      * @throws KeeperException
  65.      * @throws InterruptedException
  66.      */
  67.  
  68.     void addQueue() throws KeeperException, InterruptedException{
  69.         zk.exists(root + "/start",true);
  70.         zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  71.         synchronized (mutex) {
  72.             List<String> list = zk.getChildren(root, false);
  73.             if (list.size() < size) {
  74.                 mutex.wait();
  75.             } else {
  76.                 zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  77.             }
  78.         }
  79.     }
  80.  
  81.     @Override
  82.     public void process(WatchedEvent event) {
  83.         if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){
  84.             System.out.println("得到通知");
  85.             super.process(event);
  86.             doAction();
  87.         }
  88.     }
  89.  
  90.     /**
  91.      * 执行其他任务
  92.      */
  93.     private void doAction(){
  94.         System.out.println("同步队列已经得到同步,可以开始执行后面的任务了");
  95.     }
  96.  
  97.     public static void main(String args[]) {
  98.         //启动Server
  99.         String connectString = "localhost:2181";
  100.         int size = 1;
  101.         Synchronizing b = new Synchronizing(connectString, "/synchronizing", size);
  102.         try{
  103.             b.addQueue();
  104.         } catch (KeeperException e){
  105.             logger.error(e);
  106.         } catch (InterruptedException e){
  107.             logger.error(e);
  108.         }
  109.     }
  110. }

(2) FIFO 队列用 Zookeeper 实现思路如下:

实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO。

下面是生产者和消费者这种队列形式的示例代码

清单 5 FIFOQueue 代码

  1. import org.apache.log4j.Logger;
  2. import org.apache.zookeeper.CreateMode;
  3. import org.apache.zookeeper.KeeperException;
  4. import org.apache.zookeeper.WatchedEvent;
  5. import org.apache.zookeeper.ZooDefs;
  6. import org.apache.zookeeper.data.Stat;
  7.  
  8. import java.nio.ByteBuffer;
  9. import java.util.List;
  10.  
  11. /**
  12.  * FIFOQueue
  13.  * <p/>
  14.  * Author By: sunddenly工作室
  15.  * Created Date: 2014-11-13
  16.  */
  17. public class FIFOQueue extends TestMainClient{
  18.     public static final Logger logger = Logger.getLogger(FIFOQueue.class);
  19.  
  20.     /**
  21.      * Constructor
  22.      *
  23.      * @param connectString
  24.      * @param root
  25.      */
  26.     FIFOQueue(String connectString, String root) {
  27.         super(connectString);
  28.         this.root = root;
  29.         if (zk != null) {
  30.             try {
  31.                 Stat s = zk.exists(root, false);
  32.                 if (s == null) {
  33.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  34.                 }
  35.             } catch (KeeperException e) {
  36.                 logger.error(e);
  37.             } catch (InterruptedException e) {
  38.                 logger.error(e);
  39.             }
  40.         }
  41.     }
  42.     /**
  43.      * 生产者
  44.      *
  45.      * @param i
  46.      * @return
  47.      */
  48.  
  49.     boolean produce(int i) throws KeeperException, InterruptedException{
  50.         ByteBuffer b = ByteBuffer.allocate(4);
  51.         byte[] value;
  52.         b.putInt(i);
  53.         value = http://www.mamicode.com/b.array();
  54.         zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  55.                     CreateMode.PERSISTENT_SEQUENTIAL);
  56.         return true;
  57.     }
  58.  
  59.  
  60.     /**
  61.      * 消费者
  62.      *
  63.      * @return
  64.      * @throws KeeperException
  65.      * @throws InterruptedException
  66.      */
  67.     int consume() throws KeeperException, InterruptedException{
  68.         int retvalue = http://www.mamicode.com/-1;
  69.         Stat stat = null;
  70.         while (true) {
  71.             synchronized (mutex) {
  72.                 List<String> list = zk.getChildren(root, true);
  73.                 if (list.size() == 0) {
  74.                     mutex.wait();
  75.                 } else {
  76.                     Integer min = new Integer(list.get(0).substring(7));
  77.                     for(String s : list){
  78.                         Integer tempValue = http://www.mamicode.com/new Integer(s.substring(7));
  79.                         if(tempValue < min) min = tempValue;
  80.                     }
  81.                     byte[] b = zk.getData(root + "/element" + min,false, stat);
  82.                     zk.delete(root + "/element" + min, 0);
  83.                     ByteBuffer buffer = ByteBuffer.wrap(b);
  84.                     retvalue = http://www.mamicode.com/buffer.getInt();
  85.                     return retvalue;
  86.                 }
  87.             }
  88.         }
  89.     }
  90.  
  91.     @Override
  92.     public void process(WatchedEvent event) {
  93.         super.process(event);
  94.     }
  95.  
  96.     public static void main(String args[]) {
  97.         //启动Server
  98.         TestMainServer.start();
  99.         String connectString = "localhost:"+TestMainServer.CLIENT_PORT;
  100.  
  101.         FIFOQueue q = new FIFOQueue(connectString, "/app1");
  102.         int i;
  103.         Integer max = new Integer(5);
  104.  
  105.         System.out.println("Producer");
  106.         for (i = 0; i < max; i++)
  107.             try{
  108.                 q.produce(10 + i);
  109.             } catch (KeeperException e){
  110.                 logger.error(e);
  111.             } catch (InterruptedException e){
  112.                 logger.error(e);
  113.             }
  114.  
  115.         for (i = 0; i < max; i++) {
  116.             try{
  117.                 int r = q.consume();
  118.                 System.out.println("Item: " + r);
  119.             } catch (KeeperException e){
  120.                 i--;
  121.                 logger.error(e);
  122.             } catch (InterruptedException e){
  123.                 logger.error(e);
  124.             }
  125.         }
  126.  
  127.     }
  128. }

三、ZooKeeper实际应用

假设我们的集群有:

(1) 20个搜索引擎的服务器:每个负责总索引中的一部分的搜索任务。

搜索引擎的服务器中的15个服务器现在提供搜索服务

5个服务器正在生成索引

这20个搜索引擎的服务器,经常要让正在提供搜索服务的服务器停止提供服务开始生成索引,或生成索引的服务器已经把索引生成完成可以搜索提供服务了。

(2) 一个总服务器:负责向这20个搜索引擎的服务器发出搜索请求并合并结果集。

(3) 一个备用的总服务器:负责当总服务器宕机时替换总服务器。

(4) 一个web的cgi:向总服务器发出搜索请求。

使用Zookeeper可以保证:

(1) 总服务器:自动感知有多少提供搜索引擎的服务器,并向这些服务器发出搜索请求。

(2) 备用的总服务器:宕机时自动启用备用的总服务器。

(3) web的cgi:能够自动地获知总服务器的网络地址变化

(4) 实现如下:

提供搜索引擎的服务器都在Zookeeper中创建znode,zk.create("/search/nodes/node1", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

② 总服务器可以从Zookeeper中获取一个znode的子节点的列表,zk.getChildren("/search/nodes", true);

总服务器遍历这些子节点,并获取子节点的数据生成提供搜索引擎的服务器列表

当总服务器接收到子节点改变的事件信息,重新返回第二步;

总服务器在Zookeeper中创建节点,zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

⑥ 备用的总服务器监控Zookeeper中的"/search/master"节点。当这个znode的节点数据改变时,把自己启动变成总服务器,并把自己的网络地址数据放进这个节点。

web的cgi从Zookeeper中"/search/master"节点获取总服务器的网络地址数据,并向其发送搜索请求。

web的cgi监控Zookeeper中的"/search/master"节点,当这个znode的节点数据改变时,从这个节点获取总服务器的网络地址数据,并改变当前的总服务器的网络地址。

下期预告:ZooKeeper的原理,敬请关注。本期内容仅供大家参考,有什么不对的地方,希望大家给予指点更正。如果您觉得,文章写得还行,就抬起您的贵手,点一下推荐。

      多分享,多受益。
                我为人人,人人为我。
                  赠人玫瑰,手留余香

Copyright:sunddenly 工作室

Hadoop日记Day20---ZooKeeper系列(五)