首页 > 代码库 > Java-SynchronousQueue 阻塞队列小记
Java-SynchronousQueue 阻塞队列小记
SynchronousQueue --JDK1.6介绍:
public class SynchronousQueue<E>
- extends AbstractQueue<E>
- implements BlockingQueue<E>, Serializable
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。公平通常会降低吞吐量,但是可以减小可变性并避免得不到服务。
此类及其迭代器实现 Collection
和 Iterator
接口的所有可选 方法。
简介及注意点
SynchronousQueue同步队列(哈哈不知准确否)继承了BlockingQueue<E>接口,功能类似于:一直等待,来一个及时处理一个,但不能同事处理两个。比如queue.take()方法会阻塞,一直queue.offer(element)插入一个元素,二插入的元素马上就别queue.take()处理掉。所以它没有容纳元素的能力,isEmpty方法总是返回true,但是给人的感觉像是可以临时容纳一个元素。
另外在创建SynchronousQueue时可以传递一个boolean参数来指定它是否是访问它的线程按遵守FIFO顺序处理,true表示遵守FIFO。
同步队列没有任何内部容量,甚至连一个队列的容量都没有。
* iterator() 永远返回空,因为里面没东西。
* peek() 永远返回null。
* put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
* offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
* offer(2000, TimeUnit.SECONDS) 往queue里放一个element但是等待指定的时间后才返回,返回的逻辑和offer()方法一样。
* take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
* poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
* poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且remove掉queue里的element,其实就是再等其他的thread来往里塞。
* isEmpty()永远是true。
* remainingCapacity() 永远是0。
* remove()和removeAll() 永远是false。
代码示例:
import java.util.Random; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * 下面使用SynchronousQueue模拟: * 最多只有一个产品的生产者-消费者模型 * * 消费者线程们 逐个请求消费产品 * 生产者线程们 逐个生产产品。 * * @author maguowei01 * */ public class SynchronousQueueTest { public static void main(String[] args) { //true保证生产或消费者线程以FIFO的顺序访问。 SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(true); for (int i = 0; i < 3; ++i) { new Customer(queue).start(); } for (int i = 0; i < 3; ++i) { new Product(queue).start(); } } static class Product extends Thread { SynchronousQueue<Integer> queue; public Product(SynchronousQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { int rand = new Random().nextInt(1000); System.out.println("Thread Id:" + getId() + " 生产了一个产品:" + rand); System.out.println("Thread Id:" + getId() + " 等待两秒后运送出去..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } /* * offer()往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了, * offer方法返回true,认为offer成功;否则返回false。 * 也就是说offer不一定真正的插入的队列中,肯定没成功丢失了 */ // queue.offer(rand); //注意offer与put方法的区别 try { /* * put()往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。 */ queue.put(rand); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Customer extends Thread { SynchronousQueue<Integer> queue; public Customer(SynchronousQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { try { // 线程运行到queue.take()阻塞,直到Product生产一个产品queue.offer。 System.out.println("Thread Id:" + getId() + " 消费了一个产品:" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------------------------------------------"); } } } }