首页 > 代码库 > 深入并发AQS二
深入并发AQS二
AQS需要解决以下几个问题:
1.锁状态,如何保证并发情况下能够安全的更新?
2.当前线程不能获取锁时,放在哪里? AQS是放在一个队列当中
3.如何提高效率?
AQS本身有两个核心实现方法acquire及:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
AQS获取锁与释放锁调用的是acquire与release方法。
可以看到acquire方法的职责是:
如果当前线程不能获取到锁,则将其封装成一个Node放入队列当中,并决定何时将当前线程真正阻塞。
否则获取到锁时,直接返回。
release的职责是:
尝试释放锁(如果当前线程没有拥有对象锁,不能进行后续操作,即只有拥有锁的线程才能对锁进行release),
成功,则从队列中找出header下一个结点(存储了阻塞了的线程),调用LockSupport的unpark方法将它唤醒。
aquire及release中的tryAcquire与tryRelease方法交由子类去完成,子类在获取锁及释放锁时增加一些特性,如进行公平锁与非公平锁,超时等特性。
AQS的主要职责是当获取不到锁时,将线程放入CLH队列(一种变体)。并且当有线程释放当前拥有的锁时,找出header结点下一个结点(代表一个阻塞线程),并
将它唤醒。
这里所有的阻塞与唤醒操作使用LockSupport的 park(Object blocker)及unpark(Thread thread)方法。
AQS定义了四个方法供子类个性化实现如果可中断,定时获取锁的形式,如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
ReentrantLock就是用AQS作为基础框架,内部使用Sync,这个类继承了AQS。
先来看下一个线程请求一个ReentrantLock锁时发生的流程,即调用ReentrantLock的lock方法:
可以看到线程想要获取锁,在操作1时,这里具体的获取操作是:
查看当前锁state是否为0(说明当前锁未被占有),不是则说明已被其它线程获取,进入can not分支。
如果当前锁state为0,则尝试用cas操作将锁的state改为1。cas成功后再将exclusiveOwnerThread改为当前线程。
可以看到ReentrantLock的Sync类的lock代码如下:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //调用父类AQS方法 acquire(1); }
这里如果当前线程不能立即获取锁时用调用父类AQS方法。
进入acquire方法时会再次尝试能否获取到锁,调用的是tryAquire方法。这个tryAquire方法由子类即ReentrantLock的Sync类实现。
先看下父类AQS的acquire方法,代码如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
可以看到获取锁的线程会先调用tryAcquire方法,如果tryAcquire还是不能获取到锁,这时会将当前线程封装成Node放入到CLH队列。
这里先看下子类对于tryAcquire如何设计,我们主要看ReentrantLock的非公平锁实现,代码如下:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
可以看到这边还是会先走if(c==0)分支,先判断一下锁对象的state是否为0,如果是还是执行CAS设置状态操作,成功后并设置exclusiveOwnerThread为当前线程。
如果还是不能获取,这时判断当前线程是否为锁对象上的exclusiveOwnerThread对象,这里之所以要这样做是因为已经得到锁的线程,如果之后还尝试获取锁应该让调用线程知道它能够获取锁(或者说它已经拥有锁)。这里是简单的将锁获取次数加1,并设置在state状态上。
这也是可重入锁ReentrantLock的意义所在。也就是说如果一个线程A对单个锁对象获取了两个,即调用lock方法两次。
(假设另一个线程B在等待线程A持有的锁,并已经阻塞在CLH队列中),这时线程A只有再连续执行两次unlock后,才能将线程B唤醒。
了解完AQS的acquire方法中tryAcquire(arg)方法后,这里由于tryAcquire(arg)方法返回false,即:
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();