首页 > 代码库 > Java-SynchronousQueue 阻塞队列小记

Java-SynchronousQueue 阻塞队列小记

在BlockingQueue的子类中有一个SynchronousQueue(同步队列)比较少见,现在做一个简单的介绍,并附加一个简单的例子。

SynchronousQueue --JDK1.6介绍:

public class SynchronousQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable

一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。

同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。公平通常会降低吞吐量,但是可以减小可变性并避免得不到服务。 

此类及其迭代器实现 Collection 和 Iterator 接口的所有可选 方法。

简介及注意点

SynchronousQueue同步队列(哈哈不知准确否)继承了BlockingQueue<E>接口,功能类似于:一直等待,来一个及时处理一个,但不能同事处理两个。比如queue.take()方法会阻塞,一直queue.offer(element)插入一个元素,二插入的元素马上就别queue.take()处理掉。所以它没有容纳元素的能力,isEmpty方法总是返回true,但是给人的感觉像是可以临时容纳一个元素。

另外在创建SynchronousQueue时可以传递一个boolean参数来指定它是否是访问它的线程按遵守FIFO顺序处理,true表示遵守FIFO。

注意:
 注意1:它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。
   同步队列没有任何内部容量,甚至连一个队列的容量都没有。
 注意2:它是线程安全的,是阻塞的。
 注意3:  不允许使用 null 元素。
 注意4:公平排序策略是指调用put的线程之间,或take的线程之间。 公平排序策略可以查考ArrayBlockingQueue中的公平策略。
 注意5:SynchronousQueue的以下方法很有趣:
    * iterator() 永远返回空,因为里面没东西。
    * peek() 永远返回null。
    * put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
    * offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。

    * offer(2000, TimeUnit.SECONDS) 往queue里放一个element但是等待指定的时间后才返回,返回的逻辑和offer()方法一样。
    * take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
    * poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
    * poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且remove掉queue里的element,其实就是再等其他的thread来往里塞。
    * isEmpty()永远是true。
    * remainingCapacity() 永远是0。
    * remove()和removeAll() 永远是false。

代码示例:

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
 * 下面使用SynchronousQueue模拟:
 * 最多只有一个产品的生产者-消费者模型
 * 
 * 消费者线程们 逐个请求消费产品
 * 生产者线程们 逐个生产产品。
 * 
 * @author maguowei01
 * 
 */
public class SynchronousQueueTest {
	public static void main(String[] args) {
                //true保证生产或消费者线程以FIFO的顺序访问。
		SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(true);
		for (int i = 0; i < 3; ++i) {
			new Customer(queue).start();
		}
		for (int i = 0; i < 3; ++i) {
			new Product(queue).start();
		}
	}
	static class Product extends Thread {
		SynchronousQueue<Integer> queue;
		public Product(SynchronousQueue<Integer> queue) {
			this.queue = queue;
		}
		@Override
		public void run() {
			while (true) {
				int rand = new Random().nextInt(1000);
				System.out.println("Thread Id:" + getId() + "  生产了一个产品:" + rand);
				System.out.println("Thread Id:" + getId() + " 等待两秒后运送出去...");
				try {
					TimeUnit.SECONDS.sleep(2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				/*
				 * offer()往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,
				 * offer方法返回true,认为offer成功;否则返回false。
				 * 也就是说offer不一定真正的插入的队列中,肯定没成功丢失了
				 */
				
				// queue.offer(rand);  //注意offer与put方法的区别
				try {
					/* 
					 * put()往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
					 */
					queue.put(rand);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
	static class Customer extends Thread {
		SynchronousQueue<Integer> queue;
		public Customer(SynchronousQueue<Integer> queue) {
			this.queue = queue;
		}
		@Override
		public void run() {
			while (true) {
				try {
					// 线程运行到queue.take()阻塞,直到Product生产一个产品queue.offer。
					System.out.println("Thread Id:" + getId() + " 消费了一个产品:" + queue.take());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("------------------------------------------");
			}
		}
	}
}