首页 > 代码库 > ArrayBlockingQueue源码分析

ArrayBlockingQueue源码分析

转自:http://www.xiaoyaochong.net/wordpress/?p=354

ArrayBlockingQueue是Java并发框架中阻塞队列的最基本的实现,分析这个类就可以知道并发框架中是如何实现阻塞的。

 

笔者工作了一两年之后,还不知道阻塞是如何实现的,当然有一个原因是前期学习的东西比较杂,前后端的东西的懂一点,但是了解的不够深刻,我觉得这是编程学习的禁忌,不管是前端还是后端,在工作3年的时候,你应该有一个方向是拿得出手,见得了人的。

 

转回整体,ArrayBlockingQueue实现阻塞队列的关键在与,对锁(Lock)和等待条件(Condition)的使用的使用,这两个实现的基本功能类似域wait()和notify(),是wait()和notify()的高级用法。

 

本文我们主要分析ArrayBlockingQueue的3个核心方法,put(),take()和poll()。

首先是put()

 

Java代码  技术分享
  1. /** 
  2.      * 从队列的尾部插入元素,如果队列已满,将阻塞等待到队列有空间的时候进行插入操作。 
  3.      * 
  4.      */  
  5.     public void put(E e) throws InterruptedException {  
  6.         checkNotNull(e);  
  7.         final ReentrantLock lock = this.lock;  
  8.         lock.lockInterruptibly();// 获得当前线程的锁  
  9.         try {  
  10.             while (count == items.length)// 循环等待  
  11.                 notFull.await();  
  12.             insert(e);  
  13.         } finally {  
  14.             lock.unlock();//释放锁  
  15.         }  
  16.     }  

 这个方法在当前线程没有中断的情况下,获取锁,接着对数组容量进行判断,如果容量已满,则循环等待带容量腾出来为止,最后释放当前线程锁。这样的业务逻辑就产生了这样的场景,线程一进入到该方法,成功插入队列,释放锁,假设刚好容量满;线程二进入该方法,循环等待;线程三从容器中获取元素;线程二判断容量未满,插入,释放锁。如果有多个线程在等待的时候,会出现什么情况呢,从代码的逻辑来看,当多个线程都在阻塞等待的时候,要看谁首先抢到锁,也就是消费方法是抢占式的。

 

 

其次时take()

 

Java代码  技术分享
  1. /** 
  2.      * 返回队列头部的元素,如果队列为空,阻塞等待其他线程往当前容器放入元素为止。 
  3.      */  
  4. public E take() throws InterruptedException {  
  5.         final ReentrantLock lock = this.lock;  
  6.         lock.lockInterruptibly();//  获取当前线程的锁  
  7.         try {  
  8.             while (count == 0)// 阻塞等待  
  9.                 notEmpty.await();  
  10.             return extract();  
  11.         } finally {  
  12.             lock.unlock();// 释放锁  
  13.         }  
  14.     }  

这个过程和put()方法类似,这里就不罗嗦了。

 

 

最后poll(),poll有两个重载的方法,有参数和无参数,先讲无参数的。

 

Java代码  技术分享
  1. /** 
  2.      * 获取队列头部的元素,当队列为空的时候,返回null值 
  3.      * 
  4.      */  
  5. public E poll() {  
  6.         final ReentrantLock lock = this.lock;  
  7.         lock.lock();// 锁定  
  8.         try {  
  9.             return (count == 0) ? null : extract();  
  10.         } finally {  
  11.             lock.unlock();// 解锁  
  12.         }  
  13.     }  

 这个方法是不阻塞的,当队列未空的时候,直接返回null值,所以实现中只是一个锁的简单使用,防止并发问题。

 

 

poll(...)

Java代码  技术分享
  1. /** 
  2.      * 获取队列的头部元素,在指定时间之内阻塞等待,如果超出阻塞时间队列仍然空,则返回null值。 
  3.      * 
  4.      */  
  5. public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
  6.         long nanos = unit.toNanos(timeout);  
  7.         final ReentrantLock lock = this.lock;  
  8.         lock.lockInterruptibly();// 获取锁  
  9.         try {  
  10.             while (count == 0) {// 指定时间内循环等待  
  11.                 if (nanos <= 0)  
  12.                     return null;  
  13.                 nanos = notEmpty.awaitNanos(nanos);  
  14.             }  
  15.             return extract();  
  16.         } finally {  
  17.             lock.unlock();// 解锁  
  18.         }  
  19.     }  

 这个方法的逻辑和其他方法的不同之处就在Condition的一个时间计数器方法awaitNanos(...),这里先将时间大小根据时间单位换算成纳秒的数值,当队列容量为0是,使用Condition.awaitNanos(...),进行技术,超时后返回空。

 

总之,ArrayBlokingQueue使用的Java的现实锁(Lock)配合Condition进行阻塞,使用Condition进行时间技术。而在并发框架中其他的阻塞和时间技术,也同样是用这两个对象API来实现。

 

扩展阅读: 详细全面分析ArrayBlockingQueue http://www.jianshu.com/p/9a652250e0d1

ArrayBlockingQueue源码分析