首页 > 代码库 > zookeeper_04:curator
zookeeper_04:curator
- 定义
Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
- checkexists
package com.jike.testcurator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;
public class checkexists {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(5);
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
client.checkExists().inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework arg0, CuratorEvent arg1)
throws Exception {
Stat stat = arg1.getStat();
System.out.println(stat);
System.out.println(arg1.getContext());
}
},"123",es).forPath("/jike");
Thread.sleep(Integer.MAX_VALUE);
}
}
- CreateNode
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
public class CreateNode {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/jike/1","123".getBytes());
System.out.println(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
- CreateNodeAuth
package com.jike.testcurator;
import java.util.ArrayList;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
public class CreateNodeAuth {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105"));
ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456")));
ArrayList<ACL> acls = new ArrayList<ACL>();
acls.add(aclDigest);
acls.add(aclIp);
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls)
.forPath("/jike/3","123".getBytes());
System.out.println(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
- CreateSession
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
public class CreateSession {
public static void main(String[] args) throws InterruptedException {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
- DelNode
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
public class DelNode {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20");
Thread.sleep(Integer.MAX_VALUE);
}
}
- GetChildren
package com.jike.testcurator;
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
public class GetChildren {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
List<String> cList = client.getChildren().forPath("/jike20");
System.out.println(cList.toString());
}
}
- GetData
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;
public class GetData {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Stat stat = new Stat();
byte[] ret = client.getData().storingStatIn(stat).forPath("/jike");
System.out.println(new String(ret));
System.out.println(stat);
}
}
- GetDataAuth
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;
public class GetDataAuth {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.authorization("digest", "jike:123456".getBytes())
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Stat stat = new Stat();
byte[] ret = client.getData().storingStatIn(stat).forPath("/jike/3");
System.out.println(new String(ret));
System.out.println(stat);
}
}
- NodeChildrenListener
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryUntilElapsed;
public class NodeChildrenListener {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED:"+event.getData());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED:"+event.getData());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED:"+event.getData());
break;
default:
break;
}
}
});
cache.close();
Thread.sleep(Integer.MAX_VALUE);
}
}
- NodeListener
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryUntilElapsed;
public class NodeListener {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
final NodeCache cache = new NodeCache(client,"/jike");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
// TODO Auto-generated method stub
byte[] ret = cache.getCurrentData().getData();
System.out.println("new data:"+new String(ret));
}
});
cache.close();
Thread.sleep(Integer.MAX_VALUE);
}
}
- UpdateData
package com.jike.testcurator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;
public class UpdateData {
public static void main(String[] args) throws Exception {
//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/jike");
client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes());
}
}
zookeeper_04:curator