首页 > 代码库 > 阻塞队列

阻塞队列

***********************************************声明******************************************************

      原创作品,出自 “晓风残月xj” 博客,欢迎转载,转载时请务必注明出处(http://blog.csdn.net/xiaofengcanyuexj)。

      由于各种原因,可能存在诸多不足,欢迎斧正!

*********************************************************************************************************    

         线程的同步是保证多线程安全访问竞争资源的一种手段。Java中线程同步的方法有很多,如显式的synchronized、Lock等,还有如管道、阻塞队列等特殊的数据结构支持线程同步。本文谈谈我对阻塞队列的看法。

 

一、阻塞队列性质

      阻塞队列的主要性质有如下2条:

1)、任何时候只能有一个线程在插入或移除元素;

2)、当队列为空时进行获取或移除元素不会返回null或抛出异常,而是等待队列直至其不为空;当队列为满时进行插入元素不会抛出异常,而是等待队列直至其不为满。


二、阻塞队列实例

       Java类库提供的阻塞队列主要如下:

1)、LinkedBlockingQueue的容量默认是没有上限的(在不指定时容量为Integer.MAX_VALUE),也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO排序元素。
2)、ArrayBlockingQueue在构造时需要指定容量,并可以选择是否需要公平性(默认false),如果公平参数 被设置true,等待时间最长的线程会优先得到处理,其实就是通过将ReentrantLock设置为true来达到这种公平性的:即等待时间最长的线程 会先操作。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO原则对元素进行排序。
3)、PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级 顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对PriorityQueue的再次包装,是基于堆数据结构 的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞队列上put时是不会受阻的,但是如果队列为空,取元素的操作take就会阻塞。另外,往入该队列中的元 素要具有比较能力。
4)、DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。


三、源代码

/**
 * Created by xujinxj on 2015/1/22.
 */
    public enum State{
        DRY("dry"),
        BUTTERED("buttered"),
        JAMMED("jammed")
        ;

        private String description;
        State(String tDescription)
        {
            this.description=tDescription;
        }
        public String toSting() {
            return description;
        }
    };

/**
 * Created by xujinxj on 2015/1/22.
 */
public class Toast {
    private State state=State.DRY;
    private final int id;

    public Toast(int tId)
    {
        id=tId;
    }

    public void butter()
    {
        state=State.BUTTERED;
    }

    public void jam()
    {
        state=State.JAMMED;
    }

    public State getState()
    {
        return state;
    }

    public int getId()
    {
        return id;
    }

    public String toString()
    {
        return "Toast "+id+" : "+state;
    }

}

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Created by xujinxj on 2015/1/22.
 */
public class Toaster implements Runnable {
    private ToastQueue toastQueue;
    private int count;
    private Random random=new Random(47);

    public Toaster(ToastQueue tToastQueue)
    {
        toastQueue=tToastQueue;
        count=0;
    }

    public void run()
    {
        try
        {
            while(!Thread.interrupted())
            {
                TimeUnit.MILLISECONDS.sleep(random.nextInt(50));
                Toast t=new Toast(count++);
                System.out.println(t);
                toastQueue.put(t);
                if(10==count)
                    return;
            }
        }
        catch(InterruptedException e)
        {
            e.printStackTrace(System.out);
        }
        finally
        {
            System.out.println("Toaster off");
        }
    }
}

/**
 * Created by xujinxj on 2015/1/22.
 */
public class Butterer implements Runnable {

    private ToastQueue dryToastQueue,buttererToastQueue;

    public Butterer(ToastQueue tDryToastQueue,ToastQueue tButtererToastQueue)
    {
        dryToastQueue=tDryToastQueue;
        buttererToastQueue=tButtererToastQueue;
    }


    public void run()
    {
        try{

            while(!Thread.interrupted())
            {
                Toast t=dryToastQueue.take();
                t.butter();
                System.out.println(t);

                buttererToastQueue.put(t);
            }
        }
        catch(InterruptedException e)
        {
            e.printStackTrace(System.out);
        }
        finally
        {
            System.out.println("Butterer off");
        }
    }
}

/**
 * Created by xujinxj on 2015/1/22.
 */
public class Jammer implements Runnable {

    private ToastQueue buttererToastQueue,jammerToastQueue;

    public Jammer(ToastQueue tButtererToastQueue,ToastQueue tJammerToastQueue)
    {
        buttererToastQueue=tButtererToastQueue;
        jammerToastQueue=tJammerToastQueue;
    }


    public void run()
    {
        try{

            while(!Thread.interrupted())
            {
                Toast t=buttererToastQueue.take();
                t.jam();
                System.out.println(t);
                jammerToastQueue.put(t);

            }
        }
        catch(InterruptedException e)
        {
            e.printStackTrace(System.out);
        }
        finally
        {
            System.out.println("Jammer off");
        }
    }
}

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by xujinxj on 2015/1/22.
 */
public class ToastMatic {


    public static void main(String []args) throws InterruptedException {
        ToastQueue dryToastQueue = new ToastQueue(),
                buttererToastQueue = new ToastQueue(),
                jammerToastQueue = new ToastQueue();

        ExecutorService exec=Executors.newCachedThreadPool();

        exec.execute(new Toaster(dryToastQueue));
        exec.execute(new Butterer(dryToastQueue,buttererToastQueue));
        exec.execute(new Jammer(buttererToastQueue, jammerToastQueue));

        TimeUnit.SECONDS.sleep(10);
        exec.shutdown();
    }
}

      本例也涉及线程池,所谓线程池(ThreadPool)对于限制应用程序中同一时刻运行的线程数很有用,因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等。我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(BlockingQueue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。







阻塞队列