首页 > 代码库 > Zookeeper实现分布式锁

Zookeeper实现分布式锁

package com.web.util;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.CountDownLatch;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;/** * 用于跨系统或者跨服务器之间的锁,不能用于同一个虚拟机 * @author Administrator * */public class LockUtil {    /**     * zk 客户端     */    private static CuratorFramework client = null;        private static Logger logger = Logger.getLogger(LockUtil.class);        /**     * CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行     * 排他锁  阻塞:资源控制实现      */    protected static CountDownLatch latch = new CountDownLatch(1);        protected static CountDownLatch shardLocklatch = new CountDownLatch(1);        /**     * 谁当前调用获取锁的,实际生产中代表IP地址     */    private static String selfIdentity = null;        private static String selfNodeName = null;        public static synchronized void init(String connectString) {        if (client != null)            return;                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        client = CuratorFrameworkFactory.builder()            .connectString(connectString)            .sessionTimeoutMs(10000)            .retryPolicy(retryPolicy)            .namespace("LockService")            .build();        client.start();                // 创建锁目录        try {            if (client.checkExists().forPath("/ExclusiveLockDemo") == null) {                client.create()                    .creatingParentsIfNeeded()                    .withMode(CreateMode.PERSISTENT)                    .withACL(Ids.OPEN_ACL_UNSAFE)                    .forPath("/ExclusiveLockDemo");            }            // 创建锁监听            addChildWatcher("/ExclusiveLockDemo");            if (client.checkExists().forPath("/ShardLockDemo") == null) {                client.create()                    .creatingParentsIfNeeded()                    .withMode(CreateMode.PERSISTENT)                    .withACL(Ids.OPEN_ACL_UNSAFE)                    .forPath("/ShardLockDemo");            }        }        catch (Exception e) {            logger.error("ZK服务器连接不上");            throw new RuntimeException("ZK服务器连接不上");        }    }    /**     * 获取排他锁     * <功能详细描述>     */    public static synchronized void getExclusiveLock() {        while (true) {            try {                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(Ids.OPEN_ACL_UNSAFE).forPath(                    "/ExclusiveLockDemo/lock");                logger.info("成功获取到锁");                return;// 如果节点创建成功,即说明获取锁成功            }            catch (Exception e) {                logger.info("此次获取锁没有成功");                try {                    //如果没有获取到锁,需要重新设置同步资源值                    if (latch.getCount() <= 0) {                        latch = new CountDownLatch(1);                    }                    latch.await();                }                catch (InterruptedException e1) {                    e1.printStackTrace();                    logger.error("", e1);                }            }        }    }        /**     *      * @param type     *            0为读锁,1为写锁     * @param identity     *            获取当前锁的所有者     */    public static boolean getShardLock(int type, String identity) {        if (identity == null || "".equals(identity)) {            throw new RuntimeException("identity不能为空");        }        if (identity.indexOf("-") != -1) {            throw new RuntimeException("identity不能包含字符-");        }        if (type != 0 && type != 1) {            throw new RuntimeException("type只能为0或者1");        }        String nodeName = null;        if (type == 0) {            nodeName = "R" + identity + "-";        }        else if (type == 1) {            nodeName = "W" + identity + "-";        }        selfIdentity = nodeName;        try {            //if (client.checkExists().forPath("/ShardLockDemo/" + nodeName) == null)            selfNodeName =                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(Ids.OPEN_ACL_UNSAFE).forPath(                    "/ShardLockDemo/" + nodeName);            logger.info("创建节点:" + selfNodeName);            List<String> lockChildrens = client.getChildren().forPath("/ShardLockDemo");            if (!canGetLock(lockChildrens, type, nodeName.substring(0, nodeName.length() - 1), false)) {                shardLocklatch.await();            }            // return;// 获得锁成功就返回        }        catch (Exception e) {            logger.info("出现异常", e);            return false;        }                logger.info("成功获取锁");        return true;    }    /**     * java -jar lock.jar 0 192.168.1.1     * <功能详细描述>     * @param childrens     * @param type     * @param identity     * @param reps     * @return     */    private static boolean canGetLock(List<String> childrens, int type, String identity, boolean reps) {        boolean res = false;        if (childrens.size() <= 0)            return true;                try {            String currentSeq = null;            List<String> seqs = new ArrayList<String>();            //List<String> identitys = new ArrayList<String>();            Map<String, String> seqs_identitys = new HashMap<String, String>();            for (String child : childrens) {                String splits[] = child.split("-");                seqs.add(splits[1]);                //identitys.add(splits[0]);                seqs_identitys.put(splits[1], splits[0]);                if (identity.equals(splits[0]))                    currentSeq = splits[1];            }                        List<String> sortSeqs = new ArrayList<String>();            sortSeqs.addAll(seqs);            Collections.sort(sortSeqs);                        // 第一个节点,则无论是读锁还是写锁都可以获取            if (currentSeq.equals(sortSeqs.get(0))) {                res = true;                logger.info("请求锁,因为是第一个请求锁的请求,所以获取成功");                return res;            }            else {                // 写锁,不是第一个就一定会失败                if (type == 1) {                    res = false;                    //第一次请求取锁则设置监听,以后就不设置了,因为监听一直存在                    //比如:前面有两个读锁,第三个为写锁,watch通知会多了一遍,当前面的读锁释放以后,写锁会去尝试,因为前面还有一个读锁,所以获取不到,然后就会重新注册watch,为导致通知两遍                    if (reps == false)                        addChildWatcher("/ShardLockDemo");                    logger.info("请求写锁,因为前面有其它锁,所以获取锁失败");                    return res;                }            }            // int index =-1;            //因为不是第一个节点,所以要判断在当前节点之前有没有写的节点存在,如果存在写锁则不能获取读锁            boolean hasW = true;            for (String seq : sortSeqs) {                // ++index;                if (seq.equals(currentSeq)) {                    break;                }                if (!seqs_identitys.get(seq).startsWith("W"))                    hasW = false;            }            if (type == 0 && hasW == false) {                res = true;            }            else if (type == 0 && hasW == true) {                res = false;            }            if (res == false) {                // 添加监听,锁释放之后需要进行抢锁                addChildWatcher("/ShardLockDemo");                logger.info("因为没有获取到锁,添加锁的监听器");            }        }        catch (Exception e) {            e.printStackTrace();        }        return res;    }    /**     * 能删除的节点尽量删除,不然会有延迟     * <功能详细描述>     * @return     */    public static boolean unlockForExclusive() {        try {            if (client.checkExists().forPath("/ExclusiveLockDemo/lock") != null) {                client.delete().forPath("/ExclusiveLockDemo/lock");            }        }        catch (Exception e) {            e.printStackTrace();            return false;        }        return true;    }        public static boolean unlockForShardLock() {        try {            if (client.checkExists().forPath(selfNodeName) != null) {                client.delete().forPath(selfNodeName);            }        }        catch (Exception e) {            e.printStackTrace();            return false;        }        return true;    }    /**     * 添加子节点的监听     * @param path     * @throws Exception     */    public static void addChildWatcher(String path) throws Exception {        @SuppressWarnings("resource")        final PathChildrenCache cache = new PathChildrenCache(client, path, true);        cache.start(StartMode.POST_INITIALIZED_EVENT);// ppt中需要讲StartMode        // System.out.println(cache.getCurrentData().size());        cache.getListenable().addListener(new PathChildrenCacheListener() {            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {                                    }                else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {                                    }                else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {                    String path = event.getData().getPath();                    System.out.println("收到监听" + path);                    if (path.contains("ExclusiveLockDemo")) {                        logger.info("排他锁,收到锁释放通知");                        //CountDownLatch(1),此时修改为0 latch.wait()释放                        latch.countDown();                    }                    else if (path.contains("ShardLockDemo")) {                        logger.info("共享锁,收到锁释放通知");                        //收到自己的通知就不处理                        if (path.contains(selfIdentity))                            return;                        List<String> lockChildrens = client.getChildren().forPath("/ShardLockDemo");                        boolean isLock = false;                        try {                            if (selfIdentity.startsWith("R"))                                isLock = canGetLock(lockChildrens, 0, selfIdentity.substring(0, selfIdentity.length() - 1), true);                            else if (selfIdentity.startsWith("W"))                                isLock = canGetLock(lockChildrens, 1, selfIdentity.substring(0, selfIdentity.length() - 1), true);                        }                        catch (Exception e) {                            e.printStackTrace();                        }                        logger.info("收到锁释放监听后,重新尝试获取锁,结果为:" + isLock);                        if (isLock) {                            //获得锁                            logger.info("获得锁,解除因为获取不到锁的阻塞");                            shardLocklatch.countDown();                        }                    }                }                else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {                                    }            }        });    }}
package com.web;import java.util.Calendar;import java.util.Date;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.web.service.OrderRepository;import com.web.service.OrderService;import com.web.service.ProductRepository;import com.web.service.pojo.Order;import com.web.service.pojo.Product;import com.web.util.LockUtil;public class Test {    private static ApplicationContext ctx;          static     {          ctx = new ClassPathXmlApplicationContext("applicationContext.xml");      }                /**     * 测试超卖问题     * <功能详细描述>     */    public static void testpurchase(){        OrderService os = (OrderService)ctx.getBean("orderService");                 Order order = new Order();        order.setProductId(1L);        order.setCreateTime(new Date());        order.setPnum(1);        os.doOrder(order);    }        public static void testMapper(){        ProductRepository mapper = (ProductRepository)ctx.getBean("productMapper");         //测试id=1的用户查询,根据数据库中的情况,可以改成你自己的.        System.out.println("得到用户id=1的用户信息");        Product product = mapper.selectProductById(1L);        System.out.println(product.getName());                 OrderRepository omapper = (OrderRepository)ctx.getBean("orderMapper");         Order order = new Order();        order.setProductId(1L);        order.setCreateTime(new Date());        order.setPnum(1);        omapper.saveOrder(order);           }        public static void testShardLog(int type,String identity){        System.out.println("---------------开始获取锁"+identity);        LockUtil.init("localhost:2181");        LockUtil.getShardLock(type, identity);        System.out.println("---------------获取锁结束"+identity);    }    public static void main(String[] args)      {          int type = Integer.parseInt(args[0]);//0读锁还是1写锁        System.out.println("type="+type);//线程名称        testShardLog(type,args[1]);        //testShardLog(0,"f6");//        try {//            LockUtil.init("localhost:2181");//            //LockUtil.addChildWatcher("/LockService");//            LockUtil.getExclusiveLock();//        } catch (Exception e1) {//            // TODO Auto-generated catch block//            e1.printStackTrace();//        }                //排他锁***************************超卖测试***************************        //        long nowtime = Calendar.getInstance().getTimeInMillis();//        System.out.println("begin with "+nowtime);//        testpurchase();//        System.out.println("end with "+nowtime);                 //end排他锁***************************超卖测试***************************                    //    String []names = ctx.getBeanDefinitionNames();//        for(String s: names){//            System.out.println(s);//        }                try {            Thread.sleep(20000);            System.out.println("-------------开始--释放锁");            LockUtil.unlockForShardLock();            System.out.println("-------------成功--释放锁");            Thread.sleep(30000);            System.out.println("---------------结束退出");        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    } }

 

Zookeeper实现分布式锁