首页 > 代码库 > Java API操作ZooKeeper

Java API操作ZooKeeper

  • 创建会话
技术分享
 1 package org.zln.zk; 2  3 import org.apache.zookeeper.WatchedEvent; 4 import org.apache.zookeeper.Watcher; 5 import org.apache.zookeeper.ZooKeeper; 6  7 import java.io.IOException; 8  9 /**10  * Created by sherry on 16/8/27.11  */12 public class TestZooKeeperClientApi {13 14     private static ZooKeeper zooKeeper;15 16     public static void main(String[] args) throws IOException, InterruptedException {17         createSession();18     }19 20     /**21      * 创建会话22      */23     private static ZooKeeper createSession() throws IOException, InterruptedException {24         //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间  监听器(实现water接口,监听器用于接收通知)25         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {26             @Override27             public void process(WatchedEvent watchedEvent) {28                 System.out.println("收到事件:"+watchedEvent);//收到事件:WatchedEvent state:SyncConnected type:None path:null29 30 31                 //TODO32             }33         });34         System.out.println("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING35 36         //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了37         Thread.sleep(5000);38 39         return zooKeeper;40 41 42     }43 }
创建会话

 

  • 创建节点
技术分享
 1 package org.zln.zk; 2  3 import org.apache.zookeeper.*; 4 import org.apache.zookeeper.data.ACL; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7  8 import java.io.IOException; 9 import java.io.UnsupportedEncodingException;10 import java.util.ArrayList;11 12 /**13  * Created by sherry on 16/8/27.14  */15 public class TestZooKeeperClientApi {16 17     private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class);18 19     private static ZooKeeper zooKeeper;20 21     public static void main(String[] args) throws IOException, InterruptedException {22         createSession();23     }24 25     /**26      * 创建会话27      */28     private static ZooKeeper createSession() throws IOException, InterruptedException {29         //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间  监听器(实现water接口,监听器用于接收通知)30         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {31             @Override32             public void process(WatchedEvent watchedEvent) {33                 //TODO 与 ZooKeeper 的交互,一般都放在这里34                 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接35                     logger.info("连接上了");36 37                     try {38                         //参数说明:节点路径  数据的字节数组 权限 创建节点模式39                         String nodePath = createNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);40                         logger.info("创建节点:"+nodePath);41                     } catch (UnsupportedEncodingException|KeeperException|InterruptedException e) {42                         e.printStackTrace();43                     }44 45                 }46             }47         });48         logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING49 50         //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了51         Thread.sleep(5000);52 53         return zooKeeper;54 55 56     }57 58     /**59      * 创建ZooKeeper节点60      * @param zooKeeper ZooKeeper连接61      * @return 节点路径62      */63     public static String createNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException {64         //参数说明:节点路径  数据的字节数组 权限 创建节点模式65         return zooKeeper.create(path,bytes, acls, createMode);66     }67 }
创建节点
创建模式

PERSISTENT          持久节点
PERSISTENT_SEQUENTIAL  
持久顺序节点
EPHEMERAL          临时节点
EPHEMERAL_SEQUENTIAL   临时顺序节点

 

以上代码,是属于同步创建

 

技术分享
 1 /** 2      * 异步创建节点 3      * @param zooKeeper 4      * @param path 5      * @param bytes 6      * @param acls 7      * @param createMode 8      * @throws KeeperException 9      * @throws InterruptedException10      */11     public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {12 13         //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数14         zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {15             /**16              *17              * @param rc  节点创建结果返回码  0-节点创建成功18              * @param path 节点真实路径19              * @param ctx  异步调用上下文  就是 create方法本地调用的那个最后一个参数20              * @param name21              */22             @Override23             public void processResult(int rc, String path, Object ctx, String name) {24                 StringBuilder stringBuilder = new StringBuilder();25                 stringBuilder.append("\nrc="+rc+"\n" +26                         "path="+path+"\n" +27                         "ctx="+ctx+"\n" +28                         "name="+name+"\n");29                 logger.info(stringBuilder.toString());30             }31         },"异步创建");32     }
异步方式创建节点

 

  • 获取子节点
技术分享
 1     /** 2      * 同步方式获取子节点 3      * @param zooKeeper     连接 4      * @param parentPath    父路径 5      * @return 6      * @throws KeeperException 7      * @throws InterruptedException 8      */ 9     public static List<String> getChildList(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {10         //参数说明:  父节点路径  是否需要关注子节点的变化11         List<String> childs = zooKeeper.getChildren(parentPath,false);12         return childs;13     }
同步方式获取子节点且不关注子节点的变化
技术分享异步方式获取子节点且关注子节点的变化
技术分享
 1     /** 2      * 异步方式获取子节点 关注子节点变化 3      * @param zooKeeper     连接 4      * @param parentPath    父路径 5      */ 6     public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){ 7         zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() { 8             @Override 9             public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {10                 logger.info("变化后的子节点:");11                 for (String name:children){12                     logger.info("子节点:"+name);13                 }14             }15         },"关注子节点变化");16     }
AsyncCallback.Children2Callback接口实现异步关注

目前为止可以发现这个规律,有回调函数的是异步方式调用,没有回调函数的是同步调用

问:同步调用和异步调用的使用场景是???

答:下面的操作依赖调用结果的时候,就需要调用同步方法

 

 

 

技术分享
  1 package org.zln.zk;  2   3 import org.apache.zookeeper.*;  4 import org.apache.zookeeper.data.ACL;  5 import org.apache.zookeeper.data.Stat;  6 import org.slf4j.Logger;  7 import org.slf4j.LoggerFactory;  8   9 import java.io.IOException; 10 import java.io.UnsupportedEncodingException; 11 import java.util.ArrayList; 12 import java.util.List; 13  14 /** 15  * Created by sherry on 16/8/27. 16  */ 17 public class TestZooKeeperClientApi { 18  19     private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class); 20  21     private static ZooKeeper zooKeeper; 22  23     public static void main(String[] args) throws IOException, InterruptedException { 24         createSession(); 25  26         Thread.sleep(Integer.MAX_VALUE); 27     } 28  29     /** 30      * 创建会话 31      */ 32     private static ZooKeeper createSession() throws IOException, InterruptedException { 33         //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间  监听器(实现water接口,监听器用于接收通知) 34         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() { 35             @Override 36             public void process(WatchedEvent watchedEvent) { 37                 //TODO 与 ZooKeeper 的交互,一般都放在这里 38                 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接 39                     logger.info("连接上了"); 40                     try { 41                     //同步方式创建节点 42                     //String nodePath = sysCreateNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 43                     //logger.info("创建节点:"+nodePath); 44  45                     //异步方式创建节点 46                     //asCreateNode(zooKeeper,"/node_2","234".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 47  48  49                     //同步方式获取子节点  不关注子节点变化 50 //                  List<String> list = getChildListNoWatch(zooKeeper,"/"); 51 //                  for (String name:list){ 52 //                      logger.info("子节点:"+name); 53 //                  } 54  55                     //异步方式获取节点 关注子节点变化 56 //                    asGetChildListAndWatch(zooKeeper,"/"); 57  58                     //同步方式获取节点数据 sysGetNodeData 59  60                         byte[] bytes = sysGetNodeDataNoWatch(zooKeeper,"/node_1"); 61                         logger.info("获取节点数据"+new String(bytes,"UTF-8")); 62  63                         deleteNode(zooKeeper,"/node_1",0); 64                     } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) { 65                         e.printStackTrace(); 66                     } 67  68                 } 69             } 70         }); 71         logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING 72  73         return zooKeeper; 74  75  76     } 77  78     /** 79      * 同步创建节点 80      * @param zooKeeper  连接 81      * @param path       节点路径 82      * @param bytes      字节数组数据 83      * @param acls       权限 84      * @param createMode 创建模式 85      * @return 86      * @throws UnsupportedEncodingException 87      * @throws KeeperException 88      * @throws InterruptedException 89      */ 90     public static String sysCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException { 91         return zooKeeper.create(path,bytes, acls, createMode); 92     } 93  94  95     /** 96      * 异步创建节点 97      * @param zooKeeper  连接 98      * @param path       节点路径 99      * @param bytes      字节数组数据100      * @param acls       权限101      * @param createMode 创建模式102      * @throws KeeperException103      * @throws InterruptedException104      */105     public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {106 107         //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数108         zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {109             /**110              *111              * @param rc  节点创建结果返回码  0-节点创建成功112              * @param path 节点真实路径113              * @param ctx  异步调用上下文  就是 create方法本地调用的那个最后一个参数114              * @param name115              */116             @Override117             public void processResult(int rc, String path, Object ctx, String name) {118                 StringBuilder stringBuilder = new StringBuilder();119                 stringBuilder.append("\nrc="+rc+"\n" +120                         "path="+path+"\n" +121                         "ctx="+ctx+"\n" +122                         "name="+name+"\n");123                 logger.info(stringBuilder.toString());124             }125         },"异步创建");126     }127 128 129     /**130      * 同步方式获取子节点  不关注子节点变化131      * @param zooKeeper     连接132      * @param parentPath    父路径133      * @return134      * @throws KeeperException135      * @throws InterruptedException136      */137     public static List<String> sysGetChildListNoWatch(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {138         //参数说明:  父节点路径  是否需要关注子节点的变化  如果 true,则子节点发生变化后,会产生  NodeChildrenChanged 事件139         List<String> childs = zooKeeper.getChildren(parentPath,false);140         return childs;141     }142 143     /**144      * 异步方式获取子节点 关注子节点变化145      * @param zooKeeper     连接146      * @param parentPath    父路径147      */148     public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){149         zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() {150             @Override151             public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {152                 logger.info("变化后的子节点:");153                 for (String name:children){154                     logger.info("子节点:"+name);155                 }156             }157         },"关注子节点变化");158     }159 160     /**161      * 同步方式获取数据162      * @param zooKeeper163      * @param path164      * @return165      * @throws KeeperException166      * @throws InterruptedException167      */168     public static byte[] sysGetNodeDataNoWatch(ZooKeeper zooKeeper,String path) throws KeeperException, InterruptedException {169         //路径 是否关注数据变化 状态170         return zooKeeper.getData(path,false,new Stat());171     }172 173     /**174      * 删除节点175      * @param zooKeeper176      * @param nodePath177      * @param version178      * @throws KeeperException179      * @throws InterruptedException180      */181     public static void deleteNode(ZooKeeper zooKeeper,String nodePath,int version) throws KeeperException, InterruptedException {182         zooKeeper.delete(nodePath,version);183     }184 185 186 }
客户端代码汇总

 

 

除了ZooKeeper提供的Java API外,还有两种客户端,ZKClient和Curator两种客户端,都是对原生API的封装,使得操作更方便

《从PAXOS到ZOOKEEPER分布式一致性原理与实践》,可以参考这本书

Java API操作ZooKeeper