首页 > 代码库 > BlockQueue中ArrayBlockingQueue和LinkedBlockingQueue比较

BlockQueue中ArrayBlockingQueue和LinkedBlockingQueue比较

LinkedBlockingQueue是BlockingQueue的一种使用Link List的实现,它对头和尾(取和添加操作)采用两把不同的锁,相对于ArrayBlockingQueue提高了吞吐量。它也是一种阻塞型的容器,适合于实现“消费者生产者”模式。

ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

 

正因为LinkedBlockingQueue使用两个独立的锁控制数据同步,所以可以使存取两种操作并行执行,从而提高并发效率。而ArrayBlockingQueue使用一把锁,造成在存取两种操作争抢一把锁,而使得性能相对低下。LinkedBlockingQueue可以不设置队列容量,默认为Integer.MAX_VALUE.其容易造成内存溢出,一般要设置其值。

 LinkedBlockingQueue底层的定义如下:

Java代码  收藏代码
  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     static class Node<E> {  
  5.         /** The item, volatile to ensure barrier separating write and read */  
  6.         volatile E item;  
  7.         Node<E> next;  
  8.         Node(E x) { item = x; }  
  9.     }  
  10.   
  11.     // 支持原子操作  
  12.     private final AtomicInteger count = new AtomicInteger(0);  
  13.   
  14.     // 链表的头和尾  
  15.     private transient Node<E> head;  
  16.     private transient Node<E> last;  
  17.   
  18.     // 针对取和添加操作的两把锁及其上的条件  
  19.    private final ReentrantLock takeLock = new ReentrantLock();  
  20.     private final Condition notEmpty = takeLock.newCondition();  
  21.     private final ReentrantLock putLock = new ReentrantLock();  
  22.     private final Condition notFull = putLock.newCondition();  
  23.   
  24.    ...  
  25. }  

    LinkedBlockingQueue的添加操作:

Java代码  收藏代码
  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     private void insert(E x) {  
  5.         last = last.next = new Node<E>(x);  
  6.     }  
  7.   
  8.     /** 
  9.      * signal方法在被调用时,当前线程必须拥有该condition相关的锁! 
  10.      * Signal a waiting take. Called only from put/offer (which do not 
  11.      * otherwise ordinarily lock takeLock.) 
  12.      */  
  13.     private void signalNotEmpty() {  
  14.         final ReentrantLock takeLock = this.takeLock;  
  15.         takeLock.lock();  
  16.         try {  
  17.             notEmpty.signal();  
  18.         } finally {  
  19.             takeLock.unlock();  
  20.         }  
  21.     }  
  22.   
  23.     public void put(E o) throws InterruptedException {  
  24.         if (o == nullthrow new NullPointerException();  
  25.         int c = -1;  
  26.         final ReentrantLock putLock = this.putLock;  
  27.         final AtomicInteger count = this.count;  
  28.         // 使用putLock  
  29.         putLock.lockInterruptibly();  
  30.         try {  
  31.             try {  
  32.                   // 当容量已满时,等待notFull条件  
  33.             while (count.get() == capacity)  
  34.                     notFull.await();  
  35.             } catch (InterruptedException ie) {  
  36.                 notFull.signal(); // propagate to a non-interrupted thread  
  37.                 throw ie;  
  38.             }  
  39.             insert(o);  
  40.             // 取出当前值,并将原数据增加1  
  41.             c = count.getAndIncrement();  
  42.             // 容量不满,再次激活notFull上等待的put线程  
  43.         if (c + 1 < capacity)  
  44.                 notFull.signal();  
  45.         } finally {  
  46.             putLock.unlock();  
  47.         }  
  48.         // 必须先释放putLock再在notEmpty上signal,否则会造成死锁  
  49.      if (c == 0)  
  50.             signalNotEmpty();  
  51.     }  
  52.   
  53.   ...  
  54. }  

    LinkedBlockingQueue的取操作:

Java代码  收藏代码
  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     private E extract() {  
  5.         Node<E> first = head.next;  
  6.         head = first;  
  7.         E x = first.item;  
  8.         first.item = null;  
  9.         return x;  
  10.     }  
  11.   
  12.     private void signalNotFull() {  
  13.         final ReentrantLock putLock = this.putLock;  
  14.         putLock.lock();  
  15.         try {  
  16.             notFull.signal();  
  17.         } finally {  
  18.             putLock.unlock();  
  19.         }  
  20.     }  
  21.   
  22.     public E take() throws InterruptedException {  
  23.         E x;  
  24.         int c = -1;  
  25.         final AtomicInteger count = this.count;  
  26.         final ReentrantLock takeLock = this.takeLock;  
  27.         // 使用takeLock  
  28.         takeLock.lockInterruptibly();  
  29.         try {  
  30.             try {  
  31.                   // 若容量为空,等待notEmpty  
  32.                 while (count.get() == 0)  
  33.                     notEmpty.await();  
  34.             } catch (InterruptedException ie) {  
  35.                 notEmpty.signal(); // propagate to a non-interrupted thread  
  36.                 throw ie;  
  37.             }  
  38.   
  39.             x = extract();  
  40.             c = count.getAndDecrement();  
  41.             // 再次激活notEmpty  
  42.             if (c > 1)  
  43.                 notEmpty.signal();  
  44.         } finally {  
  45.             takeLock.unlock();  
  46.         }  
  47.         // take执行之前容量已满,则激活notFull  
  48.         if (c == capacity)  
  49.             signalNotFull();  
  50.         return x;  
  51.     }  
  52.   
  53.   ...  

 

 ArrayBlockingQueue底层定义如下:

 

Java代码  收藏代码
  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     // 使用循环数组来实现queue,初始时takeIndex和putIndex均为0  
  5.     private final E[] items;  
  6.     private transient int takeIndex;  
  7.     private transient int putIndex;  
  8.     private int count;  
  9.   
  10.     // 用于并发的锁和条件  
  11.    private final ReentrantLock lock;  
  12.     private final Condition notEmpty;  
  13.     private final Condition notFull;  
  14.   
  15.     /** 
  16.      * 循环数组 
  17.      * Circularly increment i. 
  18.      */  
  19.     final int inc(int i) {  
  20.         return (++i == items.length)? 0 : i;  
  21.     }  
  22.   
  23.     public ArrayBlockingQueue(int capacity, boolean fair) {  
  24.         if (capacity <= 0)  
  25.             throw new IllegalArgumentException();  
  26.         this.items = (E[]) new Object[capacity];  
  27.         // 分配锁及该锁上的condition  
  28.         lock = new ReentrantLock(fair);  
  29.         notEmpty = lock.newCondition();  
  30.         notFull =  lock.newCondition();  
  31.     }  
  32.   
  33.   ...  
  34. }  

   ArrayBlockingQueue的取操作:

Java代码  收藏代码
  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     private E extract() {  
  5.         final E[] items = this.items;  
  6.         E x = items[takeIndex];  
  7.         items[takeIndex] = null;  
  8.         takeIndex = inc(takeIndex);  
  9.         --count;  
  10.        // 激发notFull条件  
  11.         notFull.signal();  
  12.         return x;  
  13.     }  
  14.   
  15.      /** 
  16.         * condition的await的语义如下: 
  17.      * 与condition相关的锁以原子方式释放,并禁用该线程 
  18.      * 方法返回时,线程必须获得与该condition相关的锁 
  19.      */  
  20.     public E take() throws InterruptedException {  
  21.         final ReentrantLock lock = this.lock;  
  22.         lock.lockInterruptibly();  
  23.         try {  
  24.             try {  
  25.                   // 等待notEmpty的条件  
  26.                 while (count == 0)  
  27.                     notEmpty.await();  
  28.             } catch (InterruptedException ie) {  
  29.                 notEmpty.signal(); // propagate to non-interrupted thread  
  30.                 throw ie;  
  31.             }  
  32.             E x = extract();  
  33.             return x;  
  34.         } finally {  
  35.             lock.unlock();  
  36.         }  
  37.     }  
  38.   
  39.   ...  
  40. }  

   ArrayBlockingQueue的写操作:

Java代码  收藏代码
  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     private void insert(E x) {  
  5.         items[putIndex] = x;  
  6.         putIndex = inc(putIndex);  
  7.         ++count;  
  8.         notEmpty.signal();  
  9.     }  
  10.   
  11.     public void put(E o) throws InterruptedException {  
  12.         if (o == nullthrow new NullPointerException();  
  13.         final E[] items = this.items;  
  14.         final ReentrantLock lock = this.lock;  
  15.         lock.lockInterruptibly();  
  16.         try {  
  17.             try {  
  18.                   // 等待notFull条件  
  19.            while (count == items.length)  
  20.                     notFull.await();  
  21.             } catch (InterruptedException ie) {  
  22.                 notFull.signal(); // propagate to non-interrupted thread  
  23.                 throw ie;  
  24.             }  
  25.             insert(o);  
  26.         } finally {  
  27.             lock.unlock();  
  28.         }  
  29.     }  
  30.   
  31.   ...  
  32. }  

    注意:ArrayBlockingQueue在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式。

 

通过保证在临界区上多个线程的相互排斥,线程间可以完全避免竞争状态的发生,但是有时候还是需要线程之间的相互协作。使用条件(Condition)便于线程间通信。一个线程可以指定在某种条件下该做什么。标间是通过调用Lock对象的newCoditionn()方法来实现线程之间的相互通信的。

 一旦创建一个条件,就可使用await()、signal()、signalAll()方法来实现线程间通信。await()方法可以让当前线程都处于等待状态,知道条件放生。signal()方法唤醒一个等待的线程,而signalAll()方法唤醒所有等待线程。

注:

Lock接口的 线程请求锁的 几个方法:

lock(), 拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。
tryLock(),马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。
带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。

下面的lockInterruptibly()就稍微难理解一些。

先说说线程的打扰机制,每个线程都有一个 打扰 标志。这里分两种情况,
1. 线程在sleep或wait,join, 此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException;(thread在做IO操作时也可能有类似行为,见java thread api)
2. 此线程在运行中, 则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并 作出处理。

lockInterruptibly()和上面的第一种情况是一样的, 线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。

 

 

引自:http://zhuhui-zj.iteye.com/blog/784193