首页 > 代码库 > 基于Redis实现分布式锁(转载)

基于Redis实现分布式锁(转载)

原文地址:http://blog.csdn.net/ugg/article/details/41894947

Redis命令介绍
使用Redis实现分布式锁,有两个重要函数需要介绍

SETNX命令(SET if Not eXists)
语法:
SETNX key value
功能:
当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。

GETSET命令
语法:
GETSET key value
功能:
将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。

GET命令
语法:
GET key
功能:
返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。

DEL命令
语法:
DEL key [KEY …]
功能:
删除给定的一个或多个 key ,不存在的 key 会被忽略。

兵贵精,不在多。分布式锁,我们就依靠这四个命令。但在具体实现,还有很多细节,需要仔细斟酌,因为在分布式并发多进程中,任何一点出现差错,都会导致死锁,hold住所有进程。

加锁实现

SETNX 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试
SETNX foo.lock <current unix time>

如果返回1,表示客户端已经获取锁,可以往下操作,操作完成后,通过
DEL foo.lock

命令来释放锁。
如果返回0,说明foo已经被其他客户端上锁,如果锁是非堵塞的,可以选择返回调用。如果是堵塞调用调用,就需要进入以下个重试循环,直至成功获得锁或者重试超时。理想是美好的,现实是残酷的。仅仅使用SETNX加锁带有竞争条件的,在某些特定的情况会造成死锁错误。

处理死锁

在上面的处理方式中,如果获取锁的客户端端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和Redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去.

 

 

使用Redisson:

分布式Lock

 

Redisson redisson = Redisson.create();

RLock lock = redisson.getLock("anyLock");

// Most familiar locking method

lock.lock();

// Lock time-to-live support

// releases lock automatically after 10 seconds

// if unlock method not invoked

lock.lock(10, TimeUnit.SECONDS);

// Wait for 100 seconds and automatically unlock it after 10 seconds

boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

...

lock.unlock();

分布式MultiLock

 

RLock lock1 = redissonInstance1.getLock("lock1");

RLock lock2 = redissonInstance2.getLock("lock2");

RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);

lock.lock();

分布式ReadWriteLock

 

 

RReadWriteLock rwlock = redisson.getLock("anyRWLock");

// Most familiar locking method

rwlock.readLock().lock();

// or

rwlock.writeLock().lock();

// Lock time-to-live support

// releases lock automatically after 10 seconds

// if unlock method not invoked

rwlock.readLock().lock(10, TimeUnit.SECONDS);

// or

rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// Wait for 100 seconds and automatically unlock it after 10 seconds

boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);

// or

boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);

...

lock.unlock();

分布式Semaphore

 

RSemaphore semaphore = redisson.getSemaphore("semaphore");

semaphore.acquire();

semaphore.acquire(23);

semaphore.tryAcquire();

semaphore.tryAcquire(23, TimeUnit.SECONDS);

semaphore.release(10);

semaphore.release();

分布式AtomicLong

 

RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");

atomicLong.set(3);

atomicLong.incrementAndGet();

atomicLong.get();

分布式AtomicDouble

 

RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");

atomicDouble.set(2.81);

atomicDouble.addAndGet(4.11);

atomicDouble.get();

分布式CountDownLatch

 

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");

latch.trySetCount(1);

latch.await();

// in other thread or other JVM

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");

latch.countDown();

分布式Blocking Queue

 

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");

queue.offer(new SomeObject());

SomeObject obj = queue.peek();

SomeObject someObj = queue.poll();

SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

除此之外,还支持Queue, Deque, Blocking Deque

基于Redis实现分布式锁(转载)