首页 > 代码库 > BlockingQueue阻塞队列

BlockingQueue阻塞队列

java.util.concurrent包:
1.Excutors类:通过这个类可获得多种线程池的实例

Excutors.newSingleThreadExecutor():获得单线程的ExecutorService;Excutors.newFixedThreadPool(int nThreads):获得固定大小线程池的ExecutorService;Excutors.newCachedThreadPool():创建一个可根据需要创建新线程的线程池,返回ExecutorService。

2.ExecutorService接口

submit(Runnable task):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future;
shutdown():启动一次顺序关闭,执行以前提交的任务,但不接受新任务;
shutdownNow():试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

3.BlockingQueue接口:阻塞队列

   支持两个附加操作:获取元素时等待队列变为非空,存储元素时等待空间变得可用。具体描述为:
   如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.
   如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作.

  (1)BlockingQueue定义的常用方法如下:

add(anObject):把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常.
offer(anObject):把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则返回false.
put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,则线程被阻断,等待可用的空间.
poll(time):获取并移除BlockingQueue里排在首位的对象,若不能立即取出,则可以等待time时间,若仍取不到则返回null.
take():获取并移除BlockingQueue里排在首位的对象,若不能立即取出,阻断进入等待状态.

(2)BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类

ArrayBlockingQueue:由数组支持的有界(固定大小)BlockingQueue,其构造函数必须带一个规定大小的int参数.其所含的对象是以FIFO(先入先出)顺序排序的.
LinkedBlockingQueue:无界(大小不定)BlockingQueue,若其构造函数可带可不带规定大小的int参数.其所含的对象是以FIFO(先入先出)顺序排序的.
PriorityBlockingQueue:无界(大小不定)BlockingQueue,是一个依赖对象的自然顺序的优先级队列.也可以由构造函数的Comparator决定其顺序.
SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.同步队列没有任何内部容量.

阻塞队列代码示例:

public class BlockingQueueTest {	public static void main(String[] args) {		test();	}	static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);	static class Producer implements Runnable {		public void run() {			try {				while (true) {					queue.put("some thing");					//...				}			} catch (InterruptedException e) {			}					}	}	static class Consumer implements Runnable {		public void run() {			try {				while (true) {					queue.take();					//...				}			} catch (InterruptedException e) {			}		}	}	public static void test() {		ExecutorService service = Executors.newCachedThreadPool();		Producer producer = new Producer();		Consumer consumer = new Consumer();		service.submit(producer);		service.submit(consumer);		// 程序运行5s后,所有任务停止		try {			Thread.sleep(5000);		} catch (InterruptedException e) {		}		service.shutdownNow();			}}