首页 > 代码库 > Java_并发线程_Condition

Java_并发线程_Condition

1.概述

使用Condition应在Lock的前提下,请先参见Java_并发线程_Lock、ReadWriteLock一文。在synchronized同步代码块中使用了obj的锁对象,然后通过obj.notify()和obj.wait()来配合处理多线程的问题。然而,同样lock和condition配合使用同样可以完成同样的功能,condition只有配合lock使用才有意义,只不过lock更加的灵活,使用的格式如下。

//lock 与 Condition
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition1 = lock.newCondition();
{
	lock.lock();
	try{
		//
		condition1.await();//condition1应在与其对应的lock区间被调用,等待其它线程调用 condition1.signal();
		//
	}finally{
		lock.unlock();
	}
}


2.await()与signal()原理分析

static final class Node {
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}
public class ConditionObject implements Condition, java.io.Serializable {
	/** First node of condition queue. */
	private transient Node firstWaiter;
	/** Last node of condition queue. */
	private transient Node lastWaiter;
	
	//...
}
Condition内部维护了一个Node的双向链表,调用condition.await(),则会新创建一个Node节点,lastWaiter指向这个新创建的节点对象;每次调用signal(),则会通过firstWaiter从链表的前面拿到Node对象,并将firstWaiter指向当前Node.nextWaiter对象。然后对这个Node对象进行操作判断。

(1).await()

作用:当前线程休眠停止调度;是否锁;置于队列等待signal()

    /**
     * 如果当前线程中断,则抛出InterruptedException
     * 阻塞直到 调用了signal和线程被中断
     */
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        
        //新创建node对象,放入链表尾
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判断是否在同步队列
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

(2).signal()

    /**
     * Moves the longest-waiting thread, if one exists, from the
     * wait queue for this condition to the wait queue for the
     * owning lock.
     *
     * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
     *         returns {@code false}
     */
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }


3.自定义阻塞队列

class BoundedBuffer {
	final Lock lock = new ReentrantLock();
	final Condition notFull = lock.newCondition();
	final Condition notEmpty = lock.newCondition();

	final Object[] items = new Object[100];
	int putptr, takeptr, count;

	public void put(Object x) throws InterruptedException {
		lock.lock();
		try {
			while (count == items.length)
				//等待没有慢
				notFull.await();
			items[putptr] = x;
			if (++putptr == items.length)
				putptr = 0;
			++count;
			//已经不为空
			notEmpty.signal();
		} finally {
			lock.unlock();
		}
	}

	public Object take() throws InterruptedException {
		lock.lock();
		try {
			while (count == 0)
				//等待不为空
				notEmpty.await();
			Object x = items[takeptr];
			if (++takeptr == items.length)
				takeptr = 0;
			--count;
			//已经不为满
			notFull.signal();
			return x;
		} finally {
			lock.unlock();
		}
	}
}


4.应用实例

	private static ReentrantLock lock = new ReentrantLock();
	private static Condition condition1 = lock.newCondition();
	
	public static void main(String[] args) {
		new Thread() {
			@Override
			public void run() {
				lock.lock();
				try {
					//
					System.out.println(Thread.currentThread().getName() + ", locked");
					try {
						condition1.await();
						System.out.println(Thread.currentThread().getName() + ", awaited");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + ", will finally");
				} finally {
					System.out.println(Thread.currentThread().getName() + ", finally");
					lock.unlock();
					System.out.println(Thread.currentThread().getName() + ", unlocked");
				}
			}
		}.start();
		new Thread() {
			@Override
			public void run() {
				lock.lock();
				try {
					System.out.println(Thread.currentThread().getName() + ", locked");
					condition1.signal();//但是没有释放锁,等待lock.unlock()后,condition1对应的线程才被唤醒,和synchronized一样
					System.out.println(Thread.currentThread().getName() + ", will finally");
				} finally {
					System.out.println(Thread.currentThread().getName() + ", finally");

					try {
						Thread.sleep(10);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + ", unlocked");
					lock.unlock();
				}
			}
		}.start();
	}
	/*
	 *	Thread-1, locked
		Thread-2, locked
		Thread-2, will finally
		Thread-2, finally
		Thread-2, unlocked
		Thread-1, awaited
		Thread-1, will finally
		Thread-1, finally
		Thread-1, unlocked
	 */

Java_并发线程_Condition