首页 > 代码库 > java中的阻塞队列BlockingQueue

java中的阻塞队列BlockingQueue

一、概述

位于java.util.concurrent下,声明:public interfaceBlockingQueue<E> extendsQueue<E>

支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。

BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(nullfalse,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

 抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()不可用不可用
BlockingQueue 不接受 null 元素。试图 addputoffer 一个null 元素时,某些实现会抛出 NullPointerExceptionnull 被用作指示 poll 操作失败的警戒值。

BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAllremoveAll没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

BlockingQueue 实质上 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的end-of-streampoison 对象,并根据使用者获取这些对象的时间来对它们进行解释。 

以下是基于典型的生产者-使用者场景的一个用例。注意,BlockingQueue 可以安全地与多个生产者和多个使用者一起使用。 

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

内存一致性效果:当存在其他并发 collection 时,将对象放入 BlockingQueue 之前的线程中的操作 happen-before 随后通过另一线程从 BlockingQueue 中访问或移除该元素的操作。

此接口是 Java Collections Framework 的成员。

二、方法

1、booleanadd(E e)  

将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。当使用有容量限制的队列时,通常首选offer

指定者:
接口 Collection<E> 中的add
指定者:
接口 Queue<E> 中的add
参数:
e - 要添加的元素
返回:
true(根据 Collection.add(E) 的规定)
抛出:
IllegalStateException - 如果由于容量限制此时不能添加该元素
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null
IllegalArgumentException - 如果指定元素的某些属性不允许将其添加到此队列
2、boolean offer(E e)  
将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。当使用有容量限制的队列时,此方法通常要优于add(E),后者可能无法插入元素,而只是抛出一个异常。

指定者:
接口 Queue<E> 中的offer
参数:
e - 要添加的元素
返回:
如果该元素已添加到此队列,则返回 true;否则返回 false
抛出:
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null
IllegalArgumentException - 如果指定元素的某些属性不允许将其添加到此队列
3、void put(E e) throwsInterruptedException  将指定元素插入此队列中,将等待可用的空间(如果有必要)。

参数:
e - 要添加的元素
抛出:
InterruptedException - 如果在等待时被中断
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null
IllegalArgumentException - 如果指定元素的某些属性不允许将其添加到此队列

4、booleanoffer(E e, long timeout, TimeUnit unit)  throwsInterruptedException

将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间(如果有必要)。

参数:
e - 要添加的元素
timeout - 放弃之前等待的时间长度,以 unit 为时间单位
unit - 确定如何解释 timeout 参数的 TimeUnit
返回:
如果成功,则返回 true;如果在空间可用前超过了指定的等待时间,则返回 false
抛出:
InterruptedException - 如果在等待时被中断
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null
IllegalArgumentException - 如果指定元素的某些属性不允许将其添加到此队列
5、Etake() throwsInterruptedException  获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。

返回:
此队列的头部
抛出:
InterruptedException - 如果在等待时被中断
6、Epoll(long timeout, TimeUnit unit) throwsInterruptedException

获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。

参数:
timeout - 放弃之前要等待的时间长度,用 unit 的时间单位表示
unit - 确定如何解释 timeout 参数的 TimeUnit
返回:
此队列的头部;如果在元素可用前超过了指定的等待时间,则返回 null
抛出:
InterruptedException - 如果在等待时被中断
7、int remainingCapacity()  

返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的附加元素数量;如果没有内部限制,则返回 Integer.MAX_VALUE

注意,不能 总是通过检查 remainingCapacity 来判断尝试插入元素是否成功,因为可能出现这样的情况:其他线程即将插入或移除一个元素。

返回:
剩余容量
8、boolean remove(Object o)  
从此队列中移除指定元素的单个实例(如果存在)。更确切地讲,如果此队列包含一个或多个满足 o.equals(e) 的元素 e,则移除该元素。如果此队列包含指定元素(或者此队列由于调用而发生更改),则返回true

指定者:
接口 Collection<E> 中的remove
参数:
o - 要从此队列移除的元素(如果存在)
返回:
如果此队列由于调用而发生更改,则返回 true
抛出:
ClassCastException - 如果指定元素的类与此队列不兼容(可选)
NullPointerException - 如果指定元素为 null(可选)
9、booleancontains(Object o)  
如果此队列包含指定元素,则返回 true。更确切地讲,当且仅当此队列至少包含一个满足 o.equals(e) 的元素 e时,返回 true

指定者:
接口 Collection<E> 中的contains
参数:
o - 检查是否包含在此队列中的对象
返回:
如果此队列包含指定元素,则返回 true
抛出:
ClassCastException - 如果指定元素的类与此队列不兼容(可选)
NullPointerException - 如果指定元素为 null(可选)
10、intdrainTo(Collection<? superE> c)  
移除此队列中所有可用的元素,并将它们添加到给定 collection 中。此操作可能比反复轮询此队列更有效。在试图向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时,元素会同时在两个 collection 中出现,或者在其中一个 collection 中出现,也可能在两个 collection 中都不出现。如果试图将一个队列放入自身队列中,则会导致IllegalArgumentException 异常。此外,如果正在进行此操作时修改指定的 collection,则此操作行为是不确定的。

参数:
c - 接收传输元素的 collection
返回:
传输元素的数量
抛出:
UnsupportedOperationException - 如果指定 collection 不支持添加元素
ClassCastException - 如果此队列元素的类不允许将其添加到指定 collection
NullPointerException - 如果指定 collection 为 null
IllegalArgumentException - 如果指定 collection 是此队列,或者此队列元素的某些属性不允许将其添加到指定 collection
11、intdrainTo(Collection<? superE> c, int maxElements)  
最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。在试图向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时,元素会同时在两个 collection 中出现,或者在其中一个 collection 中出现,也可能在两个 collection 中都不出现。如果试图将一个队列放入自身队列中,则会导致IllegalArgumentException 异常。此外,如果正在进行此操作时修改指定的 collection,则此操作行为是不确定的。

参数:
c - 接收传输元素的 collection
maxElements - 传输元素的最大数量
返回:
传输元素的数量
抛出:
UnsupportedOperationException - 如果指定 collection 不支持添加元素
ClassCastException - 如果此队列元素的类不允许将其添加到指定 collection
NullPointerException - 如果指定 collection 为 null
IllegalArgumentException - 如果指定 collection 是此队列,或者此队列元素的某些属性不允许将其添加到指定 collection








java中的阻塞队列BlockingQueue