首页 > 代码库 > java并发程序基础——BlockingQueue

java并发程序基础——BlockingQueue

概述

BlockingQueue顾名思义‘阻塞的队列’,是指在它上面的操作都是被阻塞的(部分操作存在例外,如add等)。BlockQueue是java.util.concurrent工具包的重要基础工具,在ThreadPoolExcutor及tomcat等服务端容器中都有使用到。

BlockingQueue的分类

按照对象存放的数据结构分类

BlockingQueue中的对象可以存放在:数组、链表中,对应的就是ArrayBlockingQueue、LinkedBlockingQueu;

 

BlockingQueue提供了通用的存取对象方法

 技术分享

以ArrayBlockingQueue的实现代码加以说明:

ArrayBlockingQueue.offer():

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false; //如果队列已满,则返回false,不抛出异常
            else {
                enqueue(e); //向队列尾部插入元素e
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

ArrayBlockingQueue.offer(timeout),带超时的offer:

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();  //在lock锁定期间,该生产者线程可以被中断,好处是什么呢?
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos); //和offer(e)不一样,该方法会等待队列的notFull信号量,但等待时长不会超过设定的timeout时长。
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

ArrayBlockingQueue.add(): 

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full"); //队列满,抛出异常
    }

ArrayBlockingQueue.put():

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await(); //队列满时,生产者线程阻塞等待,直到该队列被消费者线程take后,notFull condition被signal触发
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

 

java并发程序基础——BlockingQueue