首页 > 代码库 > Java API操作ZooKeeper
Java API操作ZooKeeper
- 创建会话
1 package org.zln.zk; 2 3 import org.apache.zookeeper.WatchedEvent; 4 import org.apache.zookeeper.Watcher; 5 import org.apache.zookeeper.ZooKeeper; 6 7 import java.io.IOException; 8 9 /**10 * Created by sherry on 16/8/27.11 */12 public class TestZooKeeperClientApi {13 14 private static ZooKeeper zooKeeper;15 16 public static void main(String[] args) throws IOException, InterruptedException {17 createSession();18 }19 20 /**21 * 创建会话22 */23 private static ZooKeeper createSession() throws IOException, InterruptedException {24 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知)25 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {26 @Override27 public void process(WatchedEvent watchedEvent) {28 System.out.println("收到事件:"+watchedEvent);//收到事件:WatchedEvent state:SyncConnected type:None path:null29 30 31 //TODO32 }33 });34 System.out.println("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING35 36 //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了37 Thread.sleep(5000);38 39 return zooKeeper;40 41 42 }43 }
- 创建节点
1 package org.zln.zk; 2 3 import org.apache.zookeeper.*; 4 import org.apache.zookeeper.data.ACL; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 8 import java.io.IOException; 9 import java.io.UnsupportedEncodingException;10 import java.util.ArrayList;11 12 /**13 * Created by sherry on 16/8/27.14 */15 public class TestZooKeeperClientApi {16 17 private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class);18 19 private static ZooKeeper zooKeeper;20 21 public static void main(String[] args) throws IOException, InterruptedException {22 createSession();23 }24 25 /**26 * 创建会话27 */28 private static ZooKeeper createSession() throws IOException, InterruptedException {29 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知)30 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {31 @Override32 public void process(WatchedEvent watchedEvent) {33 //TODO 与 ZooKeeper 的交互,一般都放在这里34 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接35 logger.info("连接上了");36 37 try {38 //参数说明:节点路径 数据的字节数组 权限 创建节点模式39 String nodePath = createNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);40 logger.info("创建节点:"+nodePath);41 } catch (UnsupportedEncodingException|KeeperException|InterruptedException e) {42 e.printStackTrace();43 }44 45 }46 }47 });48 logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING49 50 //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了51 Thread.sleep(5000);52 53 return zooKeeper;54 55 56 }57 58 /**59 * 创建ZooKeeper节点60 * @param zooKeeper ZooKeeper连接61 * @return 节点路径62 */63 public static String createNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException {64 //参数说明:节点路径 数据的字节数组 权限 创建节点模式65 return zooKeeper.create(path,bytes, acls, createMode);66 }67 }
创建模式
PERSISTENT 持久节点
PERSISTENT_SEQUENTIAL 持久顺序节点
EPHEMERAL 临时节点
EPHEMERAL_SEQUENTIAL 临时顺序节点
以上代码,是属于同步创建
1 /** 2 * 异步创建节点 3 * @param zooKeeper 4 * @param path 5 * @param bytes 6 * @param acls 7 * @param createMode 8 * @throws KeeperException 9 * @throws InterruptedException10 */11 public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {12 13 //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数14 zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {15 /**16 *17 * @param rc 节点创建结果返回码 0-节点创建成功18 * @param path 节点真实路径19 * @param ctx 异步调用上下文 就是 create方法本地调用的那个最后一个参数20 * @param name21 */22 @Override23 public void processResult(int rc, String path, Object ctx, String name) {24 StringBuilder stringBuilder = new StringBuilder();25 stringBuilder.append("\nrc="+rc+"\n" +26 "path="+path+"\n" +27 "ctx="+ctx+"\n" +28 "name="+name+"\n");29 logger.info(stringBuilder.toString());30 }31 },"异步创建");32 }
- 获取子节点
1 /** 2 * 同步方式获取子节点 3 * @param zooKeeper 连接 4 * @param parentPath 父路径 5 * @return 6 * @throws KeeperException 7 * @throws InterruptedException 8 */ 9 public static List<String> getChildList(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {10 //参数说明: 父节点路径 是否需要关注子节点的变化11 List<String> childs = zooKeeper.getChildren(parentPath,false);12 return childs;13 }
异步方式获取子节点且关注子节点的变化
1 /** 2 * 异步方式获取子节点 关注子节点变化 3 * @param zooKeeper 连接 4 * @param parentPath 父路径 5 */ 6 public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){ 7 zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() { 8 @Override 9 public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {10 logger.info("变化后的子节点:");11 for (String name:children){12 logger.info("子节点:"+name);13 }14 }15 },"关注子节点变化");16 }
目前为止可以发现这个规律,有回调函数的是异步方式调用,没有回调函数的是同步调用
问:同步调用和异步调用的使用场景是???
答:下面的操作依赖调用结果的时候,就需要调用同步方法
1 package org.zln.zk; 2 3 import org.apache.zookeeper.*; 4 import org.apache.zookeeper.data.ACL; 5 import org.apache.zookeeper.data.Stat; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import java.io.IOException; 10 import java.io.UnsupportedEncodingException; 11 import java.util.ArrayList; 12 import java.util.List; 13 14 /** 15 * Created by sherry on 16/8/27. 16 */ 17 public class TestZooKeeperClientApi { 18 19 private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class); 20 21 private static ZooKeeper zooKeeper; 22 23 public static void main(String[] args) throws IOException, InterruptedException { 24 createSession(); 25 26 Thread.sleep(Integer.MAX_VALUE); 27 } 28 29 /** 30 * 创建会话 31 */ 32 private static ZooKeeper createSession() throws IOException, InterruptedException { 33 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知) 34 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() { 35 @Override 36 public void process(WatchedEvent watchedEvent) { 37 //TODO 与 ZooKeeper 的交互,一般都放在这里 38 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接 39 logger.info("连接上了"); 40 try { 41 //同步方式创建节点 42 //String nodePath = sysCreateNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 43 //logger.info("创建节点:"+nodePath); 44 45 //异步方式创建节点 46 //asCreateNode(zooKeeper,"/node_2","234".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 47 48 49 //同步方式获取子节点 不关注子节点变化 50 // List<String> list = getChildListNoWatch(zooKeeper,"/"); 51 // for (String name:list){ 52 // logger.info("子节点:"+name); 53 // } 54 55 //异步方式获取节点 关注子节点变化 56 // asGetChildListAndWatch(zooKeeper,"/"); 57 58 //同步方式获取节点数据 sysGetNodeData 59 60 byte[] bytes = sysGetNodeDataNoWatch(zooKeeper,"/node_1"); 61 logger.info("获取节点数据"+new String(bytes,"UTF-8")); 62 63 deleteNode(zooKeeper,"/node_1",0); 64 } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) { 65 e.printStackTrace(); 66 } 67 68 } 69 } 70 }); 71 logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING 72 73 return zooKeeper; 74 75 76 } 77 78 /** 79 * 同步创建节点 80 * @param zooKeeper 连接 81 * @param path 节点路径 82 * @param bytes 字节数组数据 83 * @param acls 权限 84 * @param createMode 创建模式 85 * @return 86 * @throws UnsupportedEncodingException 87 * @throws KeeperException 88 * @throws InterruptedException 89 */ 90 public static String sysCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException { 91 return zooKeeper.create(path,bytes, acls, createMode); 92 } 93 94 95 /** 96 * 异步创建节点 97 * @param zooKeeper 连接 98 * @param path 节点路径 99 * @param bytes 字节数组数据100 * @param acls 权限101 * @param createMode 创建模式102 * @throws KeeperException103 * @throws InterruptedException104 */105 public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {106 107 //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数108 zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {109 /**110 *111 * @param rc 节点创建结果返回码 0-节点创建成功112 * @param path 节点真实路径113 * @param ctx 异步调用上下文 就是 create方法本地调用的那个最后一个参数114 * @param name115 */116 @Override117 public void processResult(int rc, String path, Object ctx, String name) {118 StringBuilder stringBuilder = new StringBuilder();119 stringBuilder.append("\nrc="+rc+"\n" +120 "path="+path+"\n" +121 "ctx="+ctx+"\n" +122 "name="+name+"\n");123 logger.info(stringBuilder.toString());124 }125 },"异步创建");126 }127 128 129 /**130 * 同步方式获取子节点 不关注子节点变化131 * @param zooKeeper 连接132 * @param parentPath 父路径133 * @return134 * @throws KeeperException135 * @throws InterruptedException136 */137 public static List<String> sysGetChildListNoWatch(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {138 //参数说明: 父节点路径 是否需要关注子节点的变化 如果 true,则子节点发生变化后,会产生 NodeChildrenChanged 事件139 List<String> childs = zooKeeper.getChildren(parentPath,false);140 return childs;141 }142 143 /**144 * 异步方式获取子节点 关注子节点变化145 * @param zooKeeper 连接146 * @param parentPath 父路径147 */148 public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){149 zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() {150 @Override151 public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {152 logger.info("变化后的子节点:");153 for (String name:children){154 logger.info("子节点:"+name);155 }156 }157 },"关注子节点变化");158 }159 160 /**161 * 同步方式获取数据162 * @param zooKeeper163 * @param path164 * @return165 * @throws KeeperException166 * @throws InterruptedException167 */168 public static byte[] sysGetNodeDataNoWatch(ZooKeeper zooKeeper,String path) throws KeeperException, InterruptedException {169 //路径 是否关注数据变化 状态170 return zooKeeper.getData(path,false,new Stat());171 }172 173 /**174 * 删除节点175 * @param zooKeeper176 * @param nodePath177 * @param version178 * @throws KeeperException179 * @throws InterruptedException180 */181 public static void deleteNode(ZooKeeper zooKeeper,String nodePath,int version) throws KeeperException, InterruptedException {182 zooKeeper.delete(nodePath,version);183 }184 185 186 }
除了ZooKeeper提供的Java API外,还有两种客户端,ZKClient和Curator两种客户端,都是对原生API的封装,使得操作更方便
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》,可以参考这本书
Java API操作ZooKeeper
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。