首页 > 代码库 > 《java.util.concurrent 包源码阅读》07 LinkedBlockingQueue

《java.util.concurrent 包源码阅读》07 LinkedBlockingQueue

这篇文章来说说稍微复杂一些的LinkedBlockingQueue。LinkedBlockingQueue使用一个链表来实现,会有一个head和tail分别指向队列的开始和队列的结尾。因此LinkedBlockingQueue会有两把锁,分别控制这两个元素,这样在添加元素和拿走元素的时候就不会有锁的冲突,因此取走元素操作的是head,而添加元素操作的是tail。

老规矩先看offer方法和poll方法

    public boolean offer(E e) {        if (e == null) throw new NullPointerException();        final AtomicInteger count = this.count;        if (count.get() == capacity)            return false;        int c = -1;        Node<E> node = new Node(e);        final ReentrantLock putLock = this.putLock;        putLock.lock();        try {            if (count.get() < capacity) {                enqueue(node);                c = count.getAndIncrement();                if (c + 1 < capacity)                    notFull.signal();            }        } finally {            putLock.unlock();        }        if (c == 0)            signalNotEmpty();        return c >= 0;    }

可以看到offer方法在添加元素时候仅仅涉及到putLock,但是还是会需要takeLock,看看signalNotEmpty代码就知道。而poll方法拿走元素的时候涉及到takeLock,也是会需要putLock。参见signalNotFull()。关于signalNotEmpty会在后面讲阻塞的时候讲到。

    public E poll() {        final AtomicInteger count = this.count;        if (count.get() == 0)            return null;        E x = null;        int c = -1;        final ReentrantLock takeLock = this.takeLock;        takeLock.lock();        try {            if (count.get() > 0) {                x = dequeue();                c = count.getAndDecrement();                if (c > 1)                    notEmpty.signal();            }        } finally {            takeLock.unlock();        }        if (c == capacity)            signalNotFull();        return x;    }

这里顺便说说队列长度的count,因为有两把锁存在,所以如果还是像ArrayBlockingQueue一样使用基本类型的count的话会同时用到两把锁,这样就会很复杂,因此直接使用原子数据类型AtomicInteger来操作count。

接下来谈谈阻塞的问题,一个BlockingQueue会有两个Condition:notFull和notEmpty,LinkedBlockingQueue会有两把锁,因此这两个Condition肯定是由这两个锁分别创建的,takeLock创建notEmpty,putLock创建notFull。

    /** Lock held by take, poll, etc */    private final ReentrantLock takeLock = new ReentrantLock();    /** Wait queue for waiting takes */    private final Condition notEmpty = takeLock.newCondition();    /** Lock held by put, offer, etc */    private final ReentrantLock putLock = new ReentrantLock();    /** Wait queue for waiting puts */    private final Condition notFull = putLock.newCondition();

接下来看看put方法:

    public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        // Note: convention in all put/take/etc is to preset local var        // holding count negative to indicate failure unless set.        int c = -1;        Node<E> node = new Node(e);        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();        try {            /*             * Note that count is used in wait guard even though it is             * not protected by lock. This works because count can             * only decrease at this point (all other puts are shut             * out by lock), and we (or some other waiting put) are             * signalled if it ever changes from capacity. Similarly             * for all other uses of count in other wait guards.             */            while (count.get() == capacity) {                notFull.await();            }            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        } finally {            putLock.unlock();        }        if (c == 0)            signalNotEmpty();    }

其实大体逻辑和ArrayBlockingQueue差不多,也会需要通知notEmpty条件,因为notEmpty条件属于takeLock,而调用signal方法需要获取Lock,因此put方法也是用到了另外一个锁:takeLock。这里有一点会不同,按照道理来说put方法是不需要通知notFull条件的,是由由拿走元素的操作来通知的,但是notFull条件属于putLock,而拿走元素时,是用了takeLock,因此这里put方法在拥有putLock的情况通知notFull条件,会让其他添加元素的方法避免过长时间的等待。同理对于take方法来说也通知notEmpty条件。

 

    public E take() throws InterruptedException {        E x;        int c = -1;        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();        try {            while (count.get() == 0) {                notEmpty.await();            }            x = dequeue();            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();        } finally {            takeLock.unlock();        }        if (c == capacity)            signalNotFull();        return x;    }

 

最后说说remove和contains方法,因为需要操作整个链表,因此需要同时拥有锁才能操作。