首页 > 代码库 > J.U.C并发框架源码阅读(十一)DelayQueue
J.U.C并发框架源码阅读(十一)DelayQueue
基于版本jdk1.7.0_80
java.util.concurrent.DelayQueue
代码如下
/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * * * * * * * * * * * * * * * * * * * * */ /* * * * * * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} of * <tt>Delayed</tt> elements, in which an element can only be taken * when its delay has expired. The <em>head</em> of the queue is that * <tt>Delayed</tt> element whose delay expired furthest in the * past. If no delay has expired there is no head and <tt>poll</tt> * will return <tt>null</tt>. Expiration occurs when an element‘s * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less * than or equal to zero. Even though unexpired elements cannot be * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise * treated as normal elements. For example, the <tt>size</tt> method * returns the count of both expired and unexpired elements. * This queue does not permit null elements. * * <p>This class and its iterator implement all of the * <em>optional</em> methods of the {@link Collection} and {@link * Iterator} interfaces. * * <p>This class is a member of the * <a href="http://www.mamicode.com/{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <E> the type of elements held in this collection */ public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private transient final ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */ private Thread leader = null; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ private final Condition available = lock.newCondition(); /** * Creates a new <tt>DelayQueue</tt> that is initially empty. */ public DelayQueue() {} /** * Creates a <tt>DelayQueue</tt> initially containing the elements of the * given collection of {@link Delayed} instances. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); } /** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); } /** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @throws NullPointerException {@inheritDoc} */ public void put(E e) { offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return <tt>true</tt> * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } /** * Retrieves and removes the head of this queue, or returns <tt>null</tt> * if this queue has no elements with an expired delay. * * @return the head of this queue, or <tt>null</tt> if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or <tt>null</tt> if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } /** * Retrieves, but does not remove, the head of this queue, or * returns <tt>null</tt> if this queue is empty. Unlike * <tt>poll</tt>, if no expired elements are available in the queue, * this method returns the element that will expire next, * if one exists. * * @return the head of this queue, or <tt>null</tt> if this * queue is empty. */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } /** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } } /** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } } /** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. * Elements with an unexpired delay are not waited for; they are * simply discarded from the queue. */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } } /** * Always returns <tt>Integer.MAX_VALUE</tt> because * a <tt>DelayQueue</tt> is not capacity constrained. * * @return <tt>Integer.MAX_VALUE</tt> */ public int remainingCapacity() { return Integer.MAX_VALUE; } /** * Returns an array containing all of the elements in this queue. * The returned array elements are in no particular order. * * <p>The returned array will be "safe" in that no references to it are * maintained by this queue. (In other words, this method must allocate * a new array). The caller is thus free to modify the returned array. * * <p>This method acts as bridge between array-based and collection-based * APIs. * * @return an array containing all of the elements in this queue */ public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } /** * Returns an array containing all of the elements in this queue; the * runtime type of the returned array is that of the specified array. * The returned array elements are in no particular order. * If the queue fits in the specified array, it is returned therein. * Otherwise, a new array is allocated with the runtime type of the * specified array and the size of this queue. * * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to * <tt>null</tt>. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * * <p>The following code can be used to dump a delay queue into a newly * allocated array of <tt>Delayed</tt>: * * <pre> * Delayed[] a = q.toArray(new Delayed[0]);</pre> * * Note that <tt>toArray(new Object[0])</tt> is identical in function to * <tt>toArray()</tt>. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the * same runtime type is allocated for this purpose * @return an array containing all of the elements in this queue * @throws ArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * this queue * @throws NullPointerException if the specified array is null */ public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(a); } finally { lock.unlock(); } } /** * Removes a single instance of the specified element from this * queue, if it is present, whether or not it has expired. */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } /** * Returns an iterator over all the elements (both expired and * unexpired) in this queue. The iterator does not return the * elements in any particular order. * * <p>The returned iterator is a "weakly consistent" iterator that * will never throw {@link java.util.ConcurrentModificationException * ConcurrentModificationException}, and guarantees to traverse * elements as they existed upon construction of the iterator, and * may (but is not guaranteed to) reflect any modifications * subsequent to construction. * * @return an iterator over the elements in this queue */ public Iterator<E> iterator() { return new Itr(toArray()); } /** * Snapshot iterator that works off copy of underlying q array. */ private class Itr implements Iterator<E> { final Object[] array; // Array of all elements int cursor; // index of next element to return; int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); Object x = array[lastRet]; lastRet = -1; // Traverse underlying queue to find == element, // not just a .equals element. lock.lock(); try { for (Iterator it = q.iterator(); it.hasNext(); ) { if (it.next() == x) { it.remove(); return; } } } finally { lock.unlock(); } } } }
0. DelayQueue简介
无界的阻塞队列,线程安全,只能存放实现了Delayed接口的对象,存入的对象只能在到期后才能取出。
1. 接口分析
DelayQueue继承于AbstractQueue抽象类
BlockingQueue<E>(阻塞队列语义)接口
2. DelayQueue原理概述
DelayQueue内部维护了一个PriorityQueue,存入PriorityQueue的对象必须实现Delayed接口,Delayed接口又继承于Comparable接口,对象必须正确实现Delayed接口的语义。这样PriorityQueue就能根据对象的到期时间进行排序,每次poll/take的时候,会检查PriorityQueue中的第一个元素,如果它没有到期,说明整个队列中的所有对象都未到期,不能取出。这样就实现了存入的对象只能在到期后才能取出的语义。
DelayQueue内部还维护了一个ReentrantLock变量lock,每次poll/offer操作都会加锁,这样就实现了线程安全的语义。
DelayQueue内部还维护了lock的Condition变量available,用它来控制队列为空时继续poll元素会被阻塞,如果有新元素被放入则将阻塞线程唤醒,这样就实现了阻塞队列的语义。
3. Delayed接口
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
很简单的代码,可以看出,任何实现了Delayed接口的类,都必须要实现getDelay与compareTo方法,这两个方法必须被正确实现,否则DelayQueue将无法正常工作
long getDelay(TimeUnit unit) -> 根据传入的时间单位,返回当前对象距离到期还有多久时间(不能无视单位随意返回值,因为DelayQueue会根据这个返回时间设置等待时间,乱设可能会导致多余的自旋占用CPU)
public int compareTo(T o); -> 与传入的另外一个实现了Delayed接口的对象比较,两者谁先到期
4. leader线程
DelayQueue有一个很有趣的设计,内部维护了一个Thread类型的leader变量,其注释如下:
Thread designated to wait for the element at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely. The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled. So waiting threads must be prepared to acquire and lose leadership while waiting.
大概意思是说如果有多个线程在等待对象到期,只有一个线程会被设置为leader线程,这个leader线程会根据最近到期的元素来设置等待时间,其他线程都是永久等待。leader线程等待超时后会去取元素,然后唤醒其他等待线程。
关键点在于,只有一个线程会超时等待,其他线程永久等待(超时等待要维护计时器,开销肯定相对较大),这样就减少了开销。再一次体现了Doug Lea大神对性能的追求,膜之。
5. DelayQueue.offer方法解析
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock();//加锁保证线程安全 try { q.offer(e);//向内部维护的PriorityQueue插入元素,由于插入的对象实现了Comparable接口,距离超时时间最近的对象会排在队首 if (q.peek() == e) {//如果新插入的对象就被排在队首了,那么leader线程的等待时间就不正确了,需要将leader线程唤醒 leader = null; available.signal();//唤醒leader线程 } return true; } finally { lock.unlock();//解锁 } }
6. DelayQueue.take方法解析
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException {//阻塞可中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加全局可中断锁 try { for (;;) { E first = q.peek();//获取队首元素,这个元素的等待时间是最短的 if (first == null) available.await();//如果队列为空,那也没法根据队首元素设置线程等待时间了,就直接无限等待 else { long delay = first.getDelay(TimeUnit.NANOSECONDS);//用ns为单位,获取队首元素的等待时间 if (delay <= 0)//如果这个元素已经过期了,直接出队 return q.poll(); else if (leader != null)//如果队首元素还没有过期,而且leader线程存在,当前线程选择永久等待,leader线程会将其唤醒 available.await(); else {//将自己设置为leader线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay);//根据队首元素设置自己的等待超时时间 } finally { if (leader == thisThread)//如果leader线程还是自己(可能在offer方法中被修改) leader = null;//让出leader位置 } } } } } finally { if (leader == null && q.peek() != null)//如果没有leader线程,并且优先队列不为空,那么唤醒一个等待线程 available.signal(); lock.unlock();//解锁 } }
这段代码写得很好,重点关注对available条件与leader变量的操作
7. DelayQueue.poll方法解析
/** * Retrieves and removes the head of this queue, or returns <tt>null</tt> * if this queue has no elements with an expired delay. * * @return the head of this queue, or <tt>null</tt> if this * queue has no elements with an expired delay */ public E poll() {//非阻塞方法 final ReentrantLock lock = this.lock; lock.lock();//加锁 try { E first = q.peek();//获取优先队列的队首元素,如果队首元素已经超时,则poll队首元素并返回,否则返回null if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or <tt>null</tt> if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException {//超时可中断方法 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//全局可中断锁 try { for (;;) { E first = q.peek();//查找队首元素 if (first == null) {//如果队列为空,则根据传入的超时时间设置线程等待时间 if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS);//获取队首元素的等待时间 if (delay <= 0) return q.poll(); if (nanos <= 0) return null; if (nanos < delay || leader != null)//如果leader的等待时间比当前线程的等待时间长,当前线程设置等待时间并等待 nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
8. DelayQueue的迭代器
/** * Returns an iterator over all the elements (both expired and * unexpired) in this queue. The iterator does not return the * elements in any particular order. * * <p>The returned iterator is a "weakly consistent" iterator that * will never throw {@link java.util.ConcurrentModificationException * ConcurrentModificationException}, and guarantees to traverse * elements as they existed upon construction of the iterator, and * may (but is not guaranteed to) reflect any modifications * subsequent to construction. * * @return an iterator over the elements in this queue */ public Iterator<E> iterator() { return new Itr(toArray());//将PriorityQueue中的所有元素复制一份 } /** * Snapshot iterator that works off copy of underlying q array. */ private class Itr implements Iterator<E> { final Object[] array; // Array of all elements int cursor; // index of next element to return; int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() {//遍历数组 if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); Object x = array[lastRet];//获取上一次返回的元素,也就是将要被删除的元素 lastRet = -1; // Traverse underlying queue to find == element, // not just a .equals element. lock.lock();//加全局锁 try { for (Iterator it = q.iterator(); it.hasNext(); ) {//遍历底层的PriorityQueue,删除对应的元素(不一定真的能找到这个元素) if (it.next() == x) { it.remove(); return; } } } finally { lock.unlock(); } } }
迭代器是弱一致的,初始化迭代器时,会创建底层PriorityQueue中所有元素的一个拷贝,遍历操作会在这个拷贝上进行(弱一致,原PriorityQueue的修改无法在迭代器中体现)
用迭代器删除元素时,会记下试图删除的元素,然后去原PriorityQueue里寻找,如果找到相同的元素,则将其删除(不一定真的有删除动作,因为原PriorityQueue可能已经将这个元素出队了)
由于迭代器操作的实际上是PriorityQueue的一个快照,所以无论如何不会抛出ConcurrentModificationException
迭代器的遍历次序没有保证
J.U.C并发框架源码阅读(十一)DelayQueue