首页 > 代码库 > 阻塞队列
阻塞队列
***********************************************声明******************************************************
原创作品,出自 “晓风残月xj” 博客,欢迎转载,转载时请务必注明出处(http://blog.csdn.net/xiaofengcanyuexj)。
由于各种原因,可能存在诸多不足,欢迎斧正!
*********************************************************************************************************
线程的同步是保证多线程安全访问竞争资源的一种手段。Java中线程同步的方法有很多,如显式的synchronized、Lock等,还有如管道、阻塞队列等特殊的数据结构支持线程同步。本文谈谈我对阻塞队列的看法。
一、阻塞队列性质
阻塞队列的主要性质有如下2条:
1)、任何时候只能有一个线程在插入或移除元素;
2)、当队列为空时进行获取或移除元素不会返回null或抛出异常,而是等待队列直至其不为空;当队列为满时进行插入元素不会抛出异常,而是等待队列直至其不为满。
二、阻塞队列实例
Java类库提供的阻塞队列主要如下:
1)、LinkedBlockingQueue的容量默认是没有上限的(在不指定时容量为Integer.MAX_VALUE),也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO排序元素。
2)、ArrayBlockingQueue在构造时需要指定容量,并可以选择是否需要公平性(默认false),如果公平参数 被设置true,等待时间最长的线程会优先得到处理,其实就是通过将ReentrantLock设置为true来达到这种公平性的:即等待时间最长的线程 会先操作。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO原则对元素进行排序。
3)、PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级 顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对PriorityQueue的再次包装,是基于堆数据结构 的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞队列上put时是不会受阻的,但是如果队列为空,取元素的操作take就会阻塞。另外,往入该队列中的元 素要具有比较能力。
4)、DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。
三、源代码
/** * Created by xujinxj on 2015/1/22. */ public enum State{ DRY("dry"), BUTTERED("buttered"), JAMMED("jammed") ; private String description; State(String tDescription) { this.description=tDescription; } public String toSting() { return description; } };
/** * Created by xujinxj on 2015/1/22. */ public class Toast { private State state=State.DRY; private final int id; public Toast(int tId) { id=tId; } public void butter() { state=State.BUTTERED; } public void jam() { state=State.JAMMED; } public State getState() { return state; } public int getId() { return id; } public String toString() { return "Toast "+id+" : "+state; } }
import java.util.Random; import java.util.concurrent.TimeUnit; /** * Created by xujinxj on 2015/1/22. */ public class Toaster implements Runnable { private ToastQueue toastQueue; private int count; private Random random=new Random(47); public Toaster(ToastQueue tToastQueue) { toastQueue=tToastQueue; count=0; } public void run() { try { while(!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(random.nextInt(50)); Toast t=new Toast(count++); System.out.println(t); toastQueue.put(t); if(10==count) return; } } catch(InterruptedException e) { e.printStackTrace(System.out); } finally { System.out.println("Toaster off"); } } }
/** * Created by xujinxj on 2015/1/22. */ public class Butterer implements Runnable { private ToastQueue dryToastQueue,buttererToastQueue; public Butterer(ToastQueue tDryToastQueue,ToastQueue tButtererToastQueue) { dryToastQueue=tDryToastQueue; buttererToastQueue=tButtererToastQueue; } public void run() { try{ while(!Thread.interrupted()) { Toast t=dryToastQueue.take(); t.butter(); System.out.println(t); buttererToastQueue.put(t); } } catch(InterruptedException e) { e.printStackTrace(System.out); } finally { System.out.println("Butterer off"); } } }
/** * Created by xujinxj on 2015/1/22. */ public class Jammer implements Runnable { private ToastQueue buttererToastQueue,jammerToastQueue; public Jammer(ToastQueue tButtererToastQueue,ToastQueue tJammerToastQueue) { buttererToastQueue=tButtererToastQueue; jammerToastQueue=tJammerToastQueue; } public void run() { try{ while(!Thread.interrupted()) { Toast t=buttererToastQueue.take(); t.jam(); System.out.println(t); jammerToastQueue.put(t); } } catch(InterruptedException e) { e.printStackTrace(System.out); } finally { System.out.println("Jammer off"); } } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Created by xujinxj on 2015/1/22. */ public class ToastMatic { public static void main(String []args) throws InterruptedException { ToastQueue dryToastQueue = new ToastQueue(), buttererToastQueue = new ToastQueue(), jammerToastQueue = new ToastQueue(); ExecutorService exec=Executors.newCachedThreadPool(); exec.execute(new Toaster(dryToastQueue)); exec.execute(new Butterer(dryToastQueue,buttererToastQueue)); exec.execute(new Jammer(buttererToastQueue, jammerToastQueue)); TimeUnit.SECONDS.sleep(10); exec.shutdown(); } }
本例也涉及线程池,所谓线程池(ThreadPool)对于限制应用程序中同一时刻运行的线程数很有用,因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等。我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(BlockingQueue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。
阻塞队列