首页 > 代码库 > 《java.util.concurrent 包源码阅读》07 LinkedBlockingQueue
《java.util.concurrent 包源码阅读》07 LinkedBlockingQueue
这篇文章来说说稍微复杂一些的LinkedBlockingQueue。LinkedBlockingQueue使用一个链表来实现,会有一个head和tail分别指向队列的开始和队列的结尾。因此LinkedBlockingQueue会有两把锁,分别控制这两个元素,这样在添加元素和拿走元素的时候就不会有锁的冲突,因此取走元素操作的是head,而添加元素操作的是tail。
老规矩先看offer方法和poll方法
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
可以看到offer方法在添加元素时候仅仅涉及到putLock,但是还是会需要takeLock,看看signalNotEmpty代码就知道。而poll方法拿走元素的时候涉及到takeLock,也是会需要putLock。参见signalNotFull()。关于signalNotEmpty会在后面讲阻塞的时候讲到。
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
这里顺便说说队列长度的count,因为有两把锁存在,所以如果还是像ArrayBlockingQueue一样使用基本类型的count的话会同时用到两把锁,这样就会很复杂,因此直接使用原子数据类型AtomicInteger来操作count。
接下来谈谈阻塞的问题,一个BlockingQueue会有两个Condition:notFull和notEmpty,LinkedBlockingQueue会有两把锁,因此这两个Condition肯定是由这两个锁分别创建的,takeLock创建notEmpty,putLock创建notFull。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
接下来看看put方法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
其实大体逻辑和ArrayBlockingQueue差不多,也会需要通知notEmpty条件,因为notEmpty条件属于takeLock,而调用signal方法需要获取Lock,因此put方法也是用到了另外一个锁:takeLock。这里有一点会不同,按照道理来说put方法是不需要通知notFull条件的,是由由拿走元素的操作来通知的,但是notFull条件属于putLock,而拿走元素时,是用了takeLock,因此这里put方法在拥有putLock的情况通知notFull条件,会让其他添加元素的方法避免过长时间的等待。同理对于take方法来说也通知notEmpty条件。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
最后说说remove和contains方法,因为需要操作整个链表,因此需要同时拥有锁才能操作。