首页 > 代码库 > Java实现基于Redis的分布式锁

Java实现基于Redis的分布式锁

我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition()方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是

最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)

 

Java代码 下载 

  1. package cc.lixiaohui.lock;  

  2.   

  3. import java.util.concurrent.TimeUnit;  

  4.   

  5. public interface Lock {  

  6.   

  7.     /** 

  8.      * 阻塞性的获取锁, 不响应中断 

  9.      */  

  10.     void lock();  

  11.       

  12.     /** 

  13.      * 阻塞性的获取锁, 响应中断 

  14.      *  

  15.      * @throws InterruptedException 

  16.      */  

  17.     void lockInterruptibly() throws InterruptedException;  

  18.       

  19.     /** 

  20.      * 尝试获取锁, 获取不到立即返回, 不阻塞 

  21.      */  

  22.     boolean tryLock();  

  23.       

  24.     /** 

  25.      * 超时自动返回的阻塞性的获取锁, 不响应中断 

  26.      *  

  27.      * @param time 

  28.      * @param unit 

  29.      * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 

  30.      *          

  31.      */  

  32.     boolean tryLock(long time, TimeUnit unit);  

  33.       

  34.     /** 

  35.      * 超时自动返回的阻塞性的获取锁, 响应中断 

  36.      *  

  37.      * @param time 

  38.      * @param unit 

  39.      * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 

  40.      * @throws InterruptedException 在尝试获取锁的当前线程被中断 

  41.      */  

  42.     boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;  

  43.       

  44.     /** 

  45.      * 释放锁 

  46.      */  

  47.     void unlock();  

  48.       

  49. }  

 

看其抽象实现:

 

Java代码 下载 

  1. package cc.lixiaohui.lock;  

  2.   

  3. import java.util.concurrent.TimeUnit;  

  4.   

  5. /** 

  6.  * 锁的骨架实现, 真正的获取锁的步骤由子类去实现. 

  7.  *  

  8.  * @author lixiaohui 

  9.  * 

  10.  */  

  11. public abstract class AbstractLock implements Lock {  

  12.   

  13.     /** 

  14.      * <pre> 

  15.      * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,  

  16.      * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性  

  17.      * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了. 

  18.      * </pre> 

  19.      */  

  20.     protected volatile boolean locked;  

  21.   

  22.     /** 

  23.      * 当前jvm内持有该锁的线程(if have one) 

  24.      */  

  25.     private Thread exclusiveOwnerThread;  

  26.   

  27.     public void lock() {  

  28.         try {  

  29.             lock(false0nullfalse);  

  30.         } catch (InterruptedException e) {  

  31.             // TODO ignore  

  32.         }  

  33.     }  

  34.   

  35.     public void lockInterruptibly() throws InterruptedException {  

  36.         lock(false0nulltrue);  

  37.     }  

  38.   

  39.     public boolean tryLock(long time, TimeUnit unit) {  

  40.         try {  

  41.             return lock(true, time, unit, false);  

  42.         } catch (InterruptedException e) {  

  43.             // TODO ignore  

  44.         }  

  45.         return false;  

  46.     }  

  47.   

  48.     public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {  

  49.         return lock(true, time, unit, true);  

  50.     }  

  51.   

  52.     public void unlock() {  

  53.         // TODO 检查当前线程是否持有锁  

  54.         if (Thread.currentThread() != getExclusiveOwnerThread()) {  

  55.             throw new IllegalMonitorStateException("current thread does not hold the lock");  

  56.         }  

  57.           

  58.         unlock0();  

  59.         setExclusiveOwnerThread(null);  

  60.     }  

  61.   

  62.     protected void setExclusiveOwnerThread(Thread thread) {  

  63.         exclusiveOwnerThread = thread;  

  64.     }  

  65.   

  66.     protected final Thread getExclusiveOwnerThread() {  

  67.         return exclusiveOwnerThread;  

  68.     }  

  69.   

  70.     protected abstract void unlock0();  

  71.       

  72.     /** 

  73.      * 阻塞式获取锁的实现 

  74.      *  

  75.      * @param useTimeout  

  76.      * @param time 

  77.      * @param unit 

  78.      * @param interrupt 是否响应中断 

  79.      * @return 

  80.      * @throws InterruptedException 

  81.      */  

  82.     protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;  

  83.   

  84. }  

 

 基于Redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:

 

Java代码 下载 

  1. package cc.lixiaohui.lock;  

  2.   

  3. import java.util.concurrent.TimeUnit;  

  4.   

  5. import redis.clients.jedis.Jedis;  

  6.   

  7. /** 

  8.  * <pre> 

  9.  * 基于Redis的SETNX操作实现的分布式锁 

  10.  *  

  11.  * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞 

  12.  *  

  13.  * <a href="http://redis.io/commands/setnx">SETNC操作参考资料</a> 

  14.  * </pre> 

  15.  *  

  16.  * @author lixiaohui 

  17.  * 

  18.  */  

  19. public class RedisBasedDistributedLock extends AbstractLock {  

  20.       

  21.     private Jedis jedis;  

  22.       

  23.     // 锁的名字  

  24.     protected String lockKey;  

  25.       

  26.     // 锁的有效时长(毫秒)  

  27.     protected long lockExpires;  

  28.       

  29.     public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {  

  30.         this.jedis = jedis;  

  31.         this.lockKey = lockKey;  

  32.         this.lockExpires = lockExpires;  

  33.     }  

  34.   

  35.     // 阻塞式获取锁的实现  

  36.     protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{  

  37.         if (interrupt) {  

  38.             checkInterruption();  

  39.         }  

  40.           

  41.         long start = System.currentTimeMillis();  

  42.         long timeout = unit.toMillis(time); // if !useTimeout, then it‘s useless  

  43.           

  44.         while (useTimeout ? isTimeout(start, timeout) : true) {  

  45.             if (interrupt) {  

  46.                 checkInterruption();  

  47.             }  

  48.               

  49.             long lockExpireTime = System.currentTimeMillis() + lockExpires + 1;//锁超时时间  

  50.             String stringOfLockExpireTime = String.valueOf(lockExpireTime);  

  51.               

  52.             if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁  

  53.                 // TODO 成功获取到锁, 设置相关标识  

  54.                 locked = true;  

  55.                 setExclusiveOwnerThread(Thread.currentThread());  

  56.                 return true;  

  57.             }  

  58.               

  59.             String value = jedis.get(lockKey);  

  60.             if (value != null && isTimeExpired(value)) { // lock is expired  

  61.                 // 假设多个线程(非单jvm)同时走到这里  

  62.                 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic  

  63.                 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)  

  64.                 // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了  

  65.                 if (oldValue != null && isTimeExpired(oldValue)) {  

  66.                     // TODO 成功获取到锁, 设置相关标识  

  67.                     locked = true;  

  68.                     setExclusiveOwnerThread(Thread.currentThread());  

  69.                     return true;  

  70.                 }  

  71.             } else {   

  72.                 // TODO lock is not expired, enter next loop retrying  

  73.             }  

  74.         }  

  75.         return false;  

  76.     }  

  77.       

  78.     public boolean tryLock() {  

  79.         long lockExpireTime = System.currentTimeMillis() + lockExpires + 1;//锁超时时间  

  80.         String stringOfLockExpireTime = String.valueOf(lockExpireTime);  

  81.           

  82.         if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁  

  83.             // TODO 成功获取到锁, 设置相关标识  

  84.             locked = true;  

  85.             setExclusiveOwnerThread(Thread.currentThread());  

  86.             return true;  

  87.         }  

  88.           

  89.         String value = jedis.get(lockKey);  

  90.         if (value != null && isTimeExpired(value)) { // lock is expired  

  91.             // 假设多个线程(非单jvm)同时走到这里  

  92.             String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic  

  93.             // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)  

  94.             // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了  

  95.             if (oldValue != null && isTimeExpired(oldValue)) {  

  96.                 // TODO 成功获取到锁, 设置相关标识  

  97.                 locked = true;  

  98.                 setExclusiveOwnerThread(Thread.currentThread());  

  99.                 return true;  

  100.             }  

  101.         } else {   

  102.             // TODO lock is not expired, enter next loop retrying  

  103.         }  

  104.           

  105.         return false;  

  106.     }  

  107.       

  108.     /** 

  109.      * Queries if this lock is held by any thread. 

  110.      *  

  111.      * @return {@code true} if any thread holds this lock and 

  112.      *         {@code false} otherwise 

  113.      */  

  114.     public boolean isLocked() {  

  115.         if (locked) {  

  116.             return true;  

  117.         } else {  

  118.             String value = jedis.get(lockKey);  

  119.             // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,  

  120.             // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断  

  121.             // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就  

  122.             // 不是同步控制, 它只是一种锁状态的报告.  

  123.             return !isTimeExpired(value);  

  124.         }  

  125.     }  

  126.   

  127.     @Override  

  128.     protected void unlock0() {  

  129.         // TODO 判断锁是否过期  

  130.         String value = jedis.get(lockKey);  

  131.         if (!isTimeExpired(value)) {  

  132.             doUnlock();  

  133.         }  

  134.     }  

  135.   

  136.     private void checkInterruption() throws InterruptedException {  

  137.         if(Thread.currentThread().isInterrupted()) {  

  138.             throw new InterruptedException();  

  139.         }  

  140.     }  

  141.       

  142.     private boolean isTimeExpired(String value) {  

  143.         return Long.parseLong(value) < System.currentTimeMillis();  

  144.     }  

  145.       

  146.     private boolean isTimeout(long start, long timeout) {  

  147.         return start + timeout > System.currentTimeMillis();  

  148.     }  

  149.       

  150.     private void doUnlock() {  

  151.         jedis.del(lockKey);  

  152.     }  

  153.   

  154. }  

 

如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0()方法即可(所谓抽象嘛)

 


Java实现基于Redis的分布式锁