首页 > 代码库 > java Blocking Queue

java Blocking Queue

一、Java中的阻塞队列

  在多线程之间通信中,多个线程共享一个进程所分配的资源,共享内存是一种常见的通信方式,而阻塞队列则是其实现方式的一种,例如经典的生产者-消费者模式。

  A Queue that addtionally supports operations that wait for the queue to become non-empty when retrieving an element, and  wait for space to become avialable in the queue when storing an element.

  阻塞队列中提供了2个操作:

    队列为空时,获取元素的线程会阻塞队列一直至队列非空。

    队列为满时,存储元素的线程会阻塞队列非满。

  技术分享

  

  Java中的阻塞队列有7种:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

  下面进行说明

 

二、ArrayBlockingQueue

使用数组实现队列

 2.1 构造器

public ArrayBlockingQueue(int capacity,                          boolean fair,                          @NotNull Collection<? extends E> c)//Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and initially containing the elements of the given collection, added in traversal order of the collection‘s iterator.

参数说明:

  capacity: 队列的容量

  fair: 默认是false,如果是true的话则移除元素的顺序符合FIFO顺序,false的话没有顺序

  c: 预填充元素

实现主要如下:

 public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];        lock = new ReentrantLock(fair);        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }

 

 

2.2 put方法实现

/** Main lock guarding all access */    final ReentrantLock lock;    /** Condition for waiting takes */    private final Condition notEmpty;    /** Condition for waiting puts */    private final Condition notFull;

由于数组这种数据结构的特殊性,若想要线程安全的添加或者删除,都必须将整个数组锁住,因此这里实现使用一把锁, 以及2个条件队列。

public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)                notFull.await();            enqueue(e);        } finally {            lock.unlock();        }    }

这段代码非常容易理解,注意点:

(1) 可重入锁

(2) 使用2个条件队列:Java concurrent包对多条件队列的支持,古老的wait和notify方法也可以实现,只是由于条件队列中的条件不同,必须使用notifyall(),损失了性能。

(3) 可中断锁

 2.3 take() 方法说明

public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0)                notEmpty.await();            return dequeue();        } finally {            lock.unlock();        }    }

 

总结: ArrayBlockingQueue应该是最简单的阻塞队列实现了,由于数组结构的特殊性,使用了一把锁和2个条件队列,锁的方式是可中断锁。

三、LinkedBlockingQueue

使用链表实现队列,构造器使用方式和ArrayBlockingQueue一样

 

 public LinkedBlockingQueue(int capacity) {        if (capacity <= 0) throw new IllegalArgumentException();        this.capacity = capacity;        last = head = new Node<E>(null);    }

 

注意到初始化的时候创建了一个空节点...

 

3.1 使用2把锁实现

这里使用了2把锁,2个条件队列实现,而且在属性定义中就直接初始化了。一个存锁,一个取锁,一个非空条件队列,一个非满条件队列

 

  private final ReentrantLock takeLock = new ReentrantLock();    /** Wait queue for waiting takes */    private final Condition notEmpty = takeLock.newCondition();    /** Lock held by put, offer, etc */    private final ReentrantLock putLock = new ReentrantLock();    /** Wait queue for waiting puts */    private final Condition notFull = putLock.newCondition();

 

3.2 put的实现

 public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        // Note: convention in all put/take/etc is to preset local var        // holding count negative to indicate failure unless set.        int c = -1;        Node<E> node = new Node<E>(e);        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();        try {            while (count.get() == capacity) {                notFull.await();            }            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        } finally {            putLock.unlock();        }        if (c == 0)            signalNotEmpty();    }

说明:

  • 除了锁之外,使用了原子变量来记录链表大小,因为使用了2把锁来锁节点,全局链表的大小使用线程安全的原子变量确实比较合适
  • 此时锁的是putLock,存锁

下面看singalNotEmpty()方法:

 

 private void signalNotEmpty() {        final ReentrantLock takeLock = this.takeLock;        takeLock.lock();        try {            notEmpty.signal();        } finally {            takeLock.unlock();        }    }

 

发现唤醒时使用了取锁,使用了连续的锁。我们大致梳理一下顺序,在不考虑异常的情况下:

(1) 当前线程竞争存锁putLock并将其锁住

(2) 条件队列notFull(非满条件)等待

(3) 链表中插入元素

(4) 原子变量自增

(5) if(c+1<capacity),则条件队列notFull唤醒操作,通知其他的put线程,链表未达上限,依旧可以插入元素

(6) 释放putLock锁

(7) 如果c==0,原子变量修改-1->0,表明插入成功,通知其他take线程可以去取出元素

  i. 当前线程竞争takeLock,并且锁住

  ii. notEmpty.signal,通知其他在wait take lock的线程可以取出元素

  iii. 释放takeLock

 

3.3 take的实现

类似,略

3.4 总结

(1) 使用了2把不同的锁

(2) 使用原子变量控制容量

(3) 使用链表数据结构

为什么可以用2把锁提升性能,减少锁竞争?
注意下面2个方法:入队方法操作的是last,出对方法操作的first,正是由于链表结构的特殊性,可以使用2把锁来提高锁粒度,从而减少锁竞争。

 private void enqueue(Node<E> node) {        // assert putLock.isHeldByCurrentThread();        // assert last.next == null;        last = last.next = node;    }    /**     * Removes a node from head of queue.     *     * @return the node     */    private E dequeue() {        // assert takeLock.isHeldByCurrentThread();        // assert head.item == null;        Node<E> h = head;        Node<E> first = h.next;        h.next = h; // help GC        head = first;        E x = first.item;        first.item = null;        return x;    }

 

四、Synchronousqueue

个人感觉http://ifeve.com/java-synchronousqueue/讲的比较详细.这里引用一段话描述:

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

package concurrent.demo.synchronous_queue;import concurrent.demo.Utils;import java.util.concurrent.CountDownLatch;/** * <B>系统名称:</B><BR> * <B>模块名称:</B><BR> * <B>中文类名:</B><BR> * <B>概要说明:</B><BR> * * @author carl.yu * @since 2016/6/14 */public class NativeSynchronousQueue<E> {    E item = null;    boolean putting = false;    //保证放进去之后,其他的取走了,其他的put线程才允许取    public synchronized void put(E e) throws InterruptedException {        //1. 只有一个item为null的时候才允许放        while (putting) {            wait();        }        putting = true;        item = e;        //2. 通知其他的线程来取,可能会通知错了人,造成假唤醒,所以需要用条件队列putting为true来判定,继续睡吧        notifyAll(); /*必须使用notify不能使用notifyAll*/        /*这里的notifyAll是为了让其他的take线程醒来,而不是put线程醒来哦*/        //3. 只有取完的线程才可以来拿        while (item != null) {            wait();        }        putting = false;        notifyAll();        /*这里才是为了put线程醒来*/    }    /*take比较简单,就是拿*/    public synchronized E take() throws InterruptedException {        E res = null;        while (item == null) {            wait();        }        res = item;        item = null;        notifyAll();        return res;    }    //测试    public static void main(String[] args) throws Exception {        final NativeSynchronousQueue queue = new NativeSynchronousQueue();        final CountDownLatch latch = new CountDownLatch(3);        Thread put01 = new Thread() {            @Override            public void run() {                try {                    queue.put("1");                    System.out.println("put01成功");                    latch.countDown();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };        Thread put02 = new Thread() {            @Override            public void run() {                try {                    queue.put("2");                    System.out.println("put02成功");                    latch.countDown();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };        Thread take01 = new Thread() {            @Override            public void run() {                try {                    System.out.println("take成功:" + queue.take());                    latch.countDown();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };        Thread take02 = new Thread() {            @Override            public void run() {                try {                    System.out.println("take成功:" + queue.take());                    latch.countDown();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };        put01.start();        Utils.sleep(10);        put02.start();        Utils.sleep(10);        //当没有线程take的时候,put永远不会成功        take01.start();        Utils.sleep(10);        take02.start();        latch.await();        System.out.println("测试结束");    }

 五、条件队列和线程池的使用

Java中,线程池的使用非常常见,jdk1.8中还新增了许多连接池,例如newWorkStealingPool用作并行计算...

这里主要强调条件队列的用法

5.1 使用有界队列实现

直接演示

定义任务类:

 

public class MyTask implements Runnable {    private int taskId;    private String taskName;        public MyTask(int taskId, String taskName){        this.taskId = taskId;        this.taskName = taskName;    }        public int getTaskId() {        return taskId;    }    public void setTaskId(int taskId) {        this.taskId = taskId;    }    public String getTaskName() {        return taskName;    }    public void setTaskName(String taskName) {        this.taskName = taskName;    }      public void run() {        try {            System.out.println("run taskId =" + this.taskId);            Thread.sleep(5*1000);            //System.out.println("end taskId =" + this.taskId);        } catch (InterruptedException e) {            e.printStackTrace();        }            }        public String toString(){        return Integer.toString(this.taskId);    }}

 

自定义拒绝策略:

package com.bjsxt.height.concurrent018;import java.net.HttpURLConnection;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;public class MyRejected implements RejectedExecutionHandler{        public MyRejected(){    }           public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        System.out.println("自定义处理..");        System.out.println("当前被拒绝任务为:" + r.toString());            }}

 

 

 

public static void main(String[] args) {        /**         * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,         * 若大于corePoolSize,则会将任务加入队列,         * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,         * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。         *          */            ThreadPoolExecutor pool = new ThreadPoolExecutor(                1,                 //coreSize                2,                 //MaxSize                60,             //60                TimeUnit.SECONDS,                 new ArrayBlockingQueue<Runnable>(3)            //指定一种队列 (有界队列)                //new LinkedBlockingQueue<Runnable>()                , new MyRejected()                //, new DiscardOldestPolicy()                );                MyTask mt1 = new MyTask(1, "任务1");        MyTask mt2 = new MyTask(2, "任务2");        MyTask mt3 = new MyTask(3, "任务3");        MyTask mt4 = new MyTask(4, "任务4");        MyTask mt5 = new MyTask(5, "任务5");        MyTask mt6 = new MyTask(6, "任务6");                pool.execute(mt1);        pool.execute(mt2);        pool.execute(mt3);        pool.execute(mt4);        pool.execute(mt5);        pool.execute(mt6);                pool.shutdown();            }

 

 

 

逐渐增加任务数,由1增加到6发现效果如下:

(1) 当任务数<1时,加入一个任务直接创建线程去处理

(2) 继续加入任务>corePoolSize=1时,会加入队列ArrayBlockingQueue

(3) 一直加入到任务数为4,队列中3个元素,队列满了

(4) 任务数继续增加,到5,会继续创建线程一直到maxPoolSize(2),因此此时执行的任务都1和5

(5) 任务数增加为6,执行拒绝策略

 

5.2 无界队列实现

将上述代码修改为用LinkedBlockingQueue()无界队列实现,发现此时maxSize参数没有作用,拒绝策略也没有作用,只有coreSize才有作用

(1) 当任务数<coreSize,会创建线程

(2) 当任务数>coreSize,会将任务放置到无界队列中,直到系统崩溃

(3) maxSize没有任何作用

(4) 拒绝策略没有任何作用

因此,JDK在实现FixedThreadPool时,maxSize和coreSize相等

 

public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }

 

 

 

5.3 Synchronousqueue实现

JDK中CachedThreadPool用这种队列实现,性能非常好

public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());    }

 

  

java Blocking Queue