首页 > 代码库 > Zookeeper入门开发demo
Zookeeper入门开发demo
package CreateGroup;import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.AsyncCallback.Children2Callback;import org.apache.zookeeper.AsyncCallback.ChildrenCallback;import org.apache.zookeeper.AsyncCallback.StatCallback;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.apache.zookeeper.server.ConnectionBean;public class TestZkGroup implements Watcher { private static final int SESSION_TIMEOUT = 5000; private ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(2); Childwatcher childwatcher = new Childwatcher(); private void getthreadname(String funname){ Thread current = Thread.currentThread(); System.out.println(funname + " is call in " + current.getName()); } @Override // 运行在另外一个线程 main-EventThread public void process(WatchedEvent event) { getthreadname("process"); System.out.println((event.getType())); // 打印状态 if (event.getState() == KeeperState.SyncConnected){ connectedSignal.countDown(); } } // 测试自定义监听 public Watcher wh = new Watcher() { public void process(WatchedEvent event) { getthreadname("Watcher::process"); System.out.println("回调watcher实例: 路径" + event.getPath() + " 类型:" + event.getType()); } }; public void connect(String hosts) throws IOException, InterruptedException{ getthreadname("connect"); zk = new ZooKeeper(hosts, SESSION_TIMEOUT, wh); // 最后一个参数用this会调用自身的监听 wh 代表? //connectedSignal.await(); // 主线程挂起 } // 加入组(可以理解成一个创建本次连接的一个组) public void join(String groupname, String meberName) throws KeeperException, InterruptedException{ String path = "/" + groupname + "/" + meberName ; // EPHEMERAL断开将被删除 String createdpath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("join : " + createdpath); } public List<String> getChilds(String path) throws KeeperException, InterruptedException { // TODO Auto-generated method stub if (zk != null) { // return zk.getChildren(path, false); // false表示没有设置观察 //zk.getChildren(path, true, childwatcher.processResult, null); zk.getChildren( path, true, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { // stat参数代表元数据 // TODO Auto-generated method stub System.out.println("****"); for (int i = 0; i < children.size() - 1; i ++){ System.out.println("mempath = " + children.get(i) + stat); } } }, null); } return null; } public void create(String path) throws KeeperException, InterruptedException, IOException{ // Ids.OPEN_ACL_UNSAFE 开放式ACL,允许任何客户端对znode进行读写 // CreateMode.PERSISTENT 持久的znode,本次连接断开后还会存在,应该有持久化操作. // PERSISTENT_SEQUENTIAL 持久化顺序,这个由zookeeper来保证 String createdpath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("crateed : " + createdpath); } private void close() throws InterruptedException{ if (zk != null){ zk.close(); } } public static class Childwatcher implements Children2Callback{ public ChildrenCallback processResult; @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { // TODO Auto-generated method stub System.out.println("**** path " + stat); } } public void delete(String groupname) throws InterruptedException, KeeperException{ zk.delete(groupname, -1); } public Stat isexist(String groupname) throws InterruptedException, KeeperException{ return zk.exists(groupname, true); // this } public void write(String path, String value)throws Exception{ Stat stat=zk.exists(path, false); if(stat==null){ zk.create(path, value.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes("UTF-8"), -1); } } public String read(String path,Watcher watch)throws Exception{ byte[] data=http://www.mamicode.com/zk.getData(path, watch, null); "UTF-8"); } public static void main(String[] args) throws Exception { String hosts = "localhost"; String groupname = "zktest" ; String meberName = String.valueOf(System.currentTimeMillis()); String path = "/" + groupname; // create TestZkGroup test = new TestZkGroup(); // 连接 test.connect(hosts); // if (null != test.isexist(path)){ test.delete(path); } test.isexist(path); test.create(path); test.isexist(path); test.write(path, "test"); test.isexist(path); String result = test.read(path, test.wh); System.out.println(path + " value = "http://www.mamicode.com/+ result);"/" + "zktest" ); if (memlist != null){ for (int i = 0; i < memlist.size() - 1; i ++){ System.out.println("mempath = " + memlist.get(i)); } } }}
Zookeeper入门开发demo
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。