首页 > 代码库 > 并发编程实践二:AbstractQueuedSynchronizer
并发编程实践二:AbstractQueuedSynchronizer
AbstractQueuedSynchronizer,简称AQS,是java.util.concurrent包的synchronizer的基础框架,其它的synchronizer(包括Lock、Semaphore、CountDownLatch、FutureTask等)都是以它作为基础构建的,这篇文章我将对AQS的框架结构作出介绍,包括它对同步状态的管理,功能流程,等待队列的管理等,并涉及到一些实现的细节,但这里不涉及原理性的东西,原理将放到后面具体的实现类中去讲述。
这片文章采用从整体到局部的方式来讲述,从总体框架一步一步细化,但并不涉及所有代码,了解了这些后,你就可以自己阅读源码学习了。Doug Lea在论文《The java.util.concurrent Synchronizer Framework》中讲述了AQS的设计理念,有兴趣的可以看看,在这里(http://gee.cs.oswego.edu/dl/papers/aqs.pdf)你可以看到。
由于文章过长,这篇文章没有涉及到ConditionObject,我将在下一篇文章中讲述。
好了,接下来我们就开始了,从Unsafe开始。
Unsafe
首先对用到的Unsafe类的方法做一下说明:
objectFieldOffset:给出指定字段的偏移量,这个偏移量是不变的,同一个类中不同的字段不会存在相同的偏移量; compareAndSwapInt:如果当前值是期望值,则更新到新值,用于int型字段; compareAndSwapObject:如果当前值是期望值,则更新到新值,用于Java对象; putObject:存储一个值到指定变量; park:阻塞当前线程,出现以下情况后返回:1)一个对应的unpark出现;2)已经出现过unpark操作;3)线程被中断;4)或者指定的时间逸出;5)虚假唤醒(没有“原因”); unpark:唤醒被阻塞的指定线程,如果指定线程还未阻塞,则调用park操作时将不会再阻塞,但注意unpark操作不会累计,对此unpark只会对应到一次的park操作。
AQS的运用
AQS根据Synchronizer的共同特征提供了一套基础框架,这些共同特征包括:一个或者多个acquire操作用于阻塞线程直到存在空闲锁允许线程通过,一个或者多个release操作释放一个或者多个锁使一个或者多个线程唤醒。我们可以使用伪码来表示这些操作,acqure操作可以表示为:
while (synchronization state does not allow acquire) { enqueue current thread if not already queued; possibly block current thread; } dequeue current thread if it was queued;
release的操作可以表示为:
update synchronization state; if (state may permit a blocked thread to acquire) unblock one or more queued threads;
下面通过一个实际的例子来看AQS是怎么被应用的,这个是java.util.concurrent.Semaphore的一个简化版(只包含非公平设置的Semaphore的代码),Semaphore的作用是为某些资源的访问提供最大线程数目限制,你可以为它设置一个最大许可值permits,则permits个线程可以同时通过acquire调用,超出的将被阻塞,直到有线程释放锁(即调用release操作)。
Semaphore的所有操作都是通过一个内部类来完成,这个内部类是AQS的子类,:
1)为AQS设置了同步状态(即锁的数量);
2)继承了AQS的tryAcquireShared方法,该方法用于为acquire操作的线程获取锁,并返回剩下的锁的数量,如果剩下的锁为负数,则表示获取锁失败,线程将被阻塞;
3)继承了AQS的tryReleaseShared方法,该方法用于为release操作的线程释放锁,并返回释放是否成功,true则表示成功,成功后会执行唤醒阻塞线程的操作。
下面看具体的代码:
public class MySemaphore { private final NonfairSync sync; static final class NonfairSync extends SemaphoreAbstractQueuedSynchronizer { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { setState(permits); } protected final int tryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } } public MySemaphore(int permits) { sync = new NonfairSync(permits); } public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void release() { sync.releaseShared(1); } }
MySemaphore展示了AQS的运用:同步状态的设置和管理、tryAcquireShared和tryReleaseShared的实现,而MySemaphore的acquire和release操作都是通过使用AQS来实现的。当多个线程同时操作acquire和release时,AQS是怎么保证操作的正确性的呢?通过对AQS的内部机制的学习,你就会知道了,下面我们从同步状态的管理开始。
同步状态
AQS中同步状态的申明和提供的操作方法如下:
private volatile int state; protected final int getState();//获取同步状态 protected final void setState(int newState);//设置同步状态 protected final boolean compareAndSetState(int expect, int update);//原子更新同步状态
同步状态是一个32位的整数,表示锁的数量,之所以使用一个32位整数,主要是考虑到一般情况下锁的数量不会需要那么多(见《The java.util.concurrent Synchronizer Framework》),子类可以通过AQS提供的方法获取和改变同步状态的值,而在tryAcquireShared和tryReleaseShared中将使用同步状态来判断线程是否该阻塞和唤醒。
AQS的主体流程
AQS提供了两种模式,独占模式和共享模式,对应的方法如下:
acquire:以独占模式获取对象,忽略中断。 acquireInterruptibly:以独占模式获取对象,如果被中断则中止。 release:以独占模式释放对象。 acquireShared:以共享模式获取对象,忽略中断。 acquireSharedInterruptibly:以共享模式获取对象,如果被中断则中止。 releaseShared:以共享模式释放对象。
我们可以把独占模式看作共享模式的一个特例,即独占模式是将锁数量设置为1的共享模式。另外,AQS还提供了一组提供时间限制的方法:
tryAcquireNanos:试图以独占模式获取对象,如果被中断则中止,如果到了给定超时时间,则会失败。 tryAcquireSharedNanos:试图以共享模式获取对象,如果被中断则中止,如果到了给定超时时间,则会失败。
这组就是在比上面的方法增加了一个时间的限制,当时间较短的情况下(小于等于1微秒)使用轮询的方式,否则采用阻塞方式。
我对流程的讲解将以共享模式为主,其它的有兴趣可以查看源代码。从宏观上来看,线程通过acquireSharedInterruptibly获取锁,操作完成后,并通过releaseShared来释放锁,我们先来看获取锁的操作:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取锁 //获取锁失败后,将进入等待队列 doAcquireSharedInterruptibly(arg); }
线程操作完成后,释放锁的操作:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //尝试释放锁 //释放锁后,执行唤醒阻塞线程的操作 doReleaseShared(); return true; } return false; }
线程获取锁失败后(通过调用acquireSharedInterruptibly),将进入等待队列,然后进入阻塞状态(经过一些判断之后);当另一个线程释放锁后(通过调用releaseShared),将执行唤醒阻塞线程的操作;被唤醒的线程将把自己移出等待队列,并执行一些其它操作。可以说,操作都是围绕等待队列来做的,下面我们就来看看等待队列。
等待队列
AQS中使用了一个FIFO队列来管理等待线程,而底层锁的实现使用CLH锁的一个变种,CLH锁通常被用于自旋锁,我们可以通过一个例子看了解一下CLH锁,可以帮助我们理解AQS的队列算法:
public class CLHLock { private static class Node { private volatile boolean locked = false;//锁状态 private volatile Node next; } private AtomicReference<Node> tail = new AtomicReference<>(); private AtomicReference<Node> head = new AtomicReference<>(); public CLHLock() { Node node = new Node(); head.set(node); tail.set(node); } //线程获取锁之后进入锁队列,locked设置为true,后面的线程将阻塞 public void lock(int key) { Node newNode = new Node(); newNode.locked = true; Node pred = null; while (true) { pred = tail.get(); if (tail.compareAndSet(pred, newNode)) { pred.next = newNode; break; } } while (pred.locked) { } } //线程释放锁后将自己从锁队列中移除,并将locked修改为false public void unlock(int key) { Node h = head.get(); Node next = h.next; while (next != null) { if (head.compareAndSet(h, next)) { next.locked = false; break; } h = head.get(); next = h.next; } } }
CLHLock的思想就是使用一个锁队列,后面的线程反复查看前面线程的状态,如果状态为锁定,则自旋等待,否则通过。
采用队列的方式可以带来性能上的好处,在现代的处理器架构中,每个处理器自身都有一个cache,用于存储处理器感兴趣的数据,处理器从cache中获取数据的效率要远远高于从内存中获取数据,处理器之间通过总线进行通信,而总线是独占设备,也就是说,每次只能有一个处理器使用总线,基于这样的架构,为了提高性能,我们应该尽量使用本地cache,尽量避免通过总线存取数据,或者尽量少的通过总线存取数据。采用队列就可以带来这样的好处,每个线程修改自己的locked只会影响后续的线程,而其它线程不会受到影响,这样就只会有一个线程会因为数据的改变而去内存中取数据,减少了数据竞争,从而提高性能。
AQS的队列算法也是基于这样的理念,但实现要比CLHLock复杂的多,为了尽量减少数据竞争,每个节点的状态只和它后续的节点相关,等待队列中的线程依次唤醒,减少获取锁的竞争。下面我们就开始详细讲述。
AQS使用了一个非阻塞队列来保存数据(如果想更多的了解无阻塞队列,可以参考我的“并发编程实践一”),我们先来看队列节点的定义:
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; //是否共享模式 final boolean isShared() { return nextWaiter == SHARED; } //获取前续节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
nextWaiter表示了两种不同的模式:
1)共享模式(SHARED,允许多个线程通过);
2)排它模式(EXCLUSIVE,只允许一个线程通过)。
我们的讲解只涉及到共享模式。
waitStatus表示了节点的状态:
1)CANCELLED:节点对应的线程已经被取消;
2)SIGNAL:节点的下一个节点的线程等待被唤醒;
3)CONDITION:节点对应的线程等待在condition队列中,在后面讲condition的时候会涉及到;
4)PROPAGATE:该节点不需要处理,直接越过。
下面我们看看算法的主要流程:
1)入队列:线程获取锁失败后,创建一个节点,并将节点添加到等待队列尾,然后将线程阻塞,等待唤醒;
2)唤醒:另一个线程释放锁,取队列的第一个节点,将节点对应线程唤醒;
3)出队列:唤醒后的线程将尝试获取锁,成功后将自己移出队列,同时判断是否任然存在空闲的锁,如果存在则继续唤醒下一个节点。
每次只会唤醒第一个节点,如果同时释放多个锁,后续的节点将由前面被唤醒的节点来唤醒,尽量减少数据竞争。
下面我们来看具体的代码。从总体流程中,线程通过acquireSharedInterruptibly请求锁,当尝试获取锁(tryAcquireShared)失败后,将进入doAcquireSharedInterruptibly处理:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //新建一个节点(共享模式),添加到队列尾 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //可能有线程释放了锁,所以再尝试一次 if (p == head) { //只会从队列头开始唤醒,唤醒后尝试获取锁 int r = tryAcquireShared(arg); if (r >= 0) { //剩余锁的数量大于等于0表示获取锁成功,开始出队列 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //这里将进入阻塞,在阻塞之前会在shouldParkAfterFailedAcquire中将p的状态设置到SIGNAL,设置成功后返回false,将进入下一次循环。为什么shouldParkAfterFailedAcquire需要返回false呢?这里用到了Happens-before原则(volatile原则),考虑如果当前节点已经为第一个节点(即锁被其它线程占用完,当前节点为第一个进入等待队列的节点),则: //1、p的状态设值为SIGNAL之后,doReleaseShared取到head的状态值为SIGNAL,则doReleaseShared中将执行该线程的唤醒操作; //考虑这样的场景:线程t1调用shouldParkAfterFailedAcquire设置p的状态为SIGNAL,然后再次尝试,tryAcquireShared刚失败后,另一个线程t2即释放了锁,那么t1和t2不管怎么执行,t2总是会执行唤醒t1的操作。 //2、p的状态设置为SIGNAL之前,doReleaseShared已经获取了head的状态(非SIGNAL),因此doReleaseShared不会做唤醒操作,但doReleaseShared肯定已经完成了解锁(tryReleaseShared)并成功,这样在shouldParkAfterFailedAcquire完成p的状态设置后进入的下一次循环将成功获取锁。 //因此shouldParkAfterFailedAcquire设置p的状态后返回false是必要的,如果没有多一次的循环,线程将可能进入永久阻塞。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); //失败后取消线程 } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { //do something } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //设置waitStatus } return false; //会导致重做一次操作 }
入队列的操作是由addWaiter来完成的,它首先尝试入队列一次,失败后再在enq中循环尝试,直到成功:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { //已经初始化过,尝试入队列一次 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //pred为空,或者入队列失败,进入enq中 enq(node); return node; } private Node enq(final Node node) { for (;;) { //循环入队列,直到成功 Node t = tail; if (t == null) { //第一次,需要初始化 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
节点进入等待队列后,如果节点的前续节点不是head,线程将在parkAndCheckInterrupt中进入阻塞状态。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
线程阻塞直到另一个线程调用releaseShared释放锁,然后在doReleaseShared中将把等待队列中的第一个节点唤醒:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //节点状态为SIGNAL,表示后续节点等待被唤醒 //改变h的状态,阻止其它线程进入,但由于判断和修改并不是原子的,因此还是可能有多个线程进入这里,但进入的多个线程只会有一个在这里处理成功 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //这里始终都只会有一个线程进入,直到head发生改变 } //将h的状态设置到PROPAGATE,但这里存在一个问题:当两个线程t1和t2调用doReleaseShared时,t1在上面将h的状态修改为0,这时t2判断h状态不为SIGNAL,则进入下卖弄将h的状态修改为PROPAGATE,而t1这时进入unparkSuccessor,会再次将h的状态修改为0,导致设置h的状态为PROPAGATE的操作失效了 //既然存在这样的问题,为什么还要将h的waitStatus设置到PROPAGATE呢?我们需要将这里的设置和线程唤醒后的操作结合来看,考虑下面的情景: //1、出现了上面描述的情景,则t1和t2必然都成功执行了tryReleaseShared操作,释放了锁,因此线程唤醒后剩余锁的数量大于0,唤醒的线程会继续唤醒下一个线程,流程正确; //2、线程t1将h的状态设置到0后,唤醒线程t3,t3尝试获取锁成功,并且得到剩余锁数量0,同时,线程t2释放了锁,这时存在3种情况:1)t3执行setHead在t2执行ws == Node.SIGNAL之前,那么t2将从一个新的head开始处理,没有问题;2)t3执行setHead在t2通过了ws == Node.SIGNAL后,但在h == head之前,则t2的h == head判断将失败,导致重做,没有问题;3)t3执行setHead在t2执行h == head之后,t2将完成退出,但这是h的状态已经设置到PROPAGATE,在t3中判断h.waitStatus < 0将成功,t3将继续唤醒下一个线程,任然正确。 //因此这里的设置对于操作流程的正确性是必要的。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) //如果head发生了变化,则需要重做 break; } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //再次将node的状态设置到0 Node s = node.next; if (s == null || s.waitStatus > 0) { //这里处理了两种可能: //1、s为空,表示s可能已经被移出队列(在共享模式下不会出现); //2、s的状态大于0,表示s被取消了,需要继续看后续节点 //在这两种情况下,将从队列的tail开始先前查找(head已经不可靠),找到最早的那个需要唤醒的线程唤醒 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); //唤醒线程 }
线程被唤醒后,将重新尝试获取锁,成功后,将开始将自己移出队列,移出队列的代码在setHeadAndPropagate中:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //记录旧head setHead(node); //重设head //尝试继续唤醒下一个节点,如果: //1、剩余锁的数量大于0,表示还有锁可以供其它线程使用; //2、waitStatus小于0,则可能为PROPAGATE(由另一个线程设置,需要唤醒后续节点); //3、h为空(共享模式下h不会为空)。 //对于node的next节点: //1、s为空,则可能node被移除了,这时情况不清楚,队列可能为空,也可能不为空; //2、s为共享模式; //这两种情况都需要做唤醒下一个线程的操作。 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); //唤醒下一个线程 } }
这里执行唤醒操作和release时执行的操作一致,由于多个线程同时调用release操作的情况下,虽然释放了多个锁,但可能只会执行一次doReleaseShared的操作,这里就做了弥补,在队列的第一个线程被唤醒,获取锁后,将再次调用doReleaseShared唤醒下一个线程,一直往下,直到锁全部用完或者队列为空。这样就能做到线程一个一个唤醒,依次获取锁,尽量减少了数据竞争(如果有新增线程请求锁,任然会存在数据竞争,但这里减少了已阻塞线程的数据竞争)。
阻塞机制
AQS的阻塞操作使用LockSupport类,最终使用Unsafe的park和unpark操作实现,如下:
public class LockSupport { private LockSupport() {} private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long parkBlockerOffset; static { try { parkBlockerOffset = unsafe.objectFieldOffset (java.lang.Thread.class.getDeclaredField("parkBlocker")); } catch (Exception ex) { throw new Error(ex); } } ...... //唤醒线程 public static void unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); } //阻塞线程 public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } ...... }
结束语
AQS向我们充分展现了并发编程的复杂性,在多个线程的交互下,情况将变得非常复杂,你往往需要将整个流程作为一个整体来分析,因此建议和源代码结合来看这篇文章,充分考虑每一个交互的环节可能出现的问题。
这是我自己对代码的分析结果,给你作为学习参考,可能存在错误,发现问题麻烦给我留言,我将非常感谢。
谢谢。
并发编程实践二:AbstractQueuedSynchronizer