首页 > 代码库 > Java并发——线程间通信与同步技术

Java并发——线程间通信与同步技术

      传统的线程间通信与同步技术为Object上的wait()、notify()、notifyAll()等方法,Java在显示锁上增加了Condition对象,该对象也可以实现线程间通信与同步。本文会介绍有界缓存的概念与实现,在一步步实现有界缓存的过程中引入线程间通信与同步技术的必要性。首先先介绍一个有界缓存的抽象基类,所有具体实现都将继承自这个抽象基类:

public abstract class BaseBoundedBuffer<V> {	private final V[] buf;	private int tail;	private int head;	private int count;	protected BaseBoundedBuffer(int capacity) {		this.buf = (V[]) new Object[capacity];	}	protected synchronized final void doPut(V v) {		buf[tail] = v;		if (++tail == buf.length)			tail = 0;		++count;	}	protected synchronized final V doTake() {		V v = buf[head];		buf[head] = null;		if (++head == buf.length)			head = 0;		--count;		return v;	}	public synchronized final boolean isFull() {		return count == buf.length;	}	public synchronized final boolean isEmpty() {		return count == 0;	}}

      在向有界缓存中插入或者提取元素时有个问题,那就是如果缓存已满还需要插入吗?如果缓存为空,提取的元素又是什么?以下几种具体实现将分别回答这个问题。

1、将异常传递给调用者

      最简单的实现方式是:如果缓存已满,向缓存中添加元素,我们就抛出异常:

public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {	public GrumpyBoundedBuffer() {		this(100);	}	public GrumpyBoundedBuffer(int size) {		super(size);	}	public synchronized void put(V v) throws BufferFullException {		if (isFull())			throw new BufferFullException();		doPut(v);	}	public synchronized V take() throws BufferEmptyException {		if (isEmpty())			throw new BufferEmptyException();		return doTake();	}}

  这种方法实现简单,但是使用起来却不简单,因为每次put()与take()时都必须准备好捕捉异常,这或许满足某些需求,但是有些人还是希望插入时检测到已满的话,可以阻塞在那里,等队列不满时插入对象。

2、通过轮询与休眠实现简单的阻塞

      当队列已满插入数据时,我们可以不抛出异常,而是让线程休眠一段时间,然后重试,此时可能队列已经不是已满状态:

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {	int SLEEP_GRANULARITY = 60;	public SleepyBoundedBuffer() {		this(100);	}	public SleepyBoundedBuffer(int size) {		super(size);	}	public void put(V v) throws InterruptedException {		while (true) {			synchronized (this) {				if (!isFull()) {					doPut(v);					return;				}			}			Thread.sleep(SLEEP_GRANULARITY);		}	}	public V take() throws InterruptedException {		while (true) {			synchronized (this) {				if (!isEmpty())					return doTake();			}			Thread.sleep(SLEEP_GRANULARITY);		}	}}

  这种实现方式最大的问题是,我们很难确定合适的休眠间隔,如果休眠间隔过长,那么程序的响应性会变差,如果休眠间隔过短,那么会浪费大量CPU时间。

3、使用条件队列实现有界缓存

      使用休眠的方式会有响应性问题,因为我们无法保证当队列为非满状态时线程就会立刻sleep结束并且检测到,所以,我们希望能有另一种实现方式,当缓存非满时,会主动唤醒线程,而不是需要线程去轮询缓存状态,Object对象上的wait()与notifyAll()能够实现这个需求。当调用wait()方法时,线程会自动释放锁,并请求请求操作系统挂起当前线程;当其他线程检测到条件满足时,会调用notifyAll()方法唤醒挂起线程,实现线程间通信与同步:

public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {	public BoundedBuffer() {		this(100);	}	public BoundedBuffer(int size) {		super(size);	}	public synchronized void put(V v) throws InterruptedException {		while (isFull())			wait();		doPut(v);		notifyAll();	}	public synchronized V take() throws InterruptedException {		while (isEmpty())			wait();		V v = doTake();		notifyAll();		return v;	}	public synchronized void alternatePut(V v) throws InterruptedException {		while (isFull())			wait();		boolean wasEmpty = isEmpty();		doPut(v);		if (wasEmpty)			notifyAll();	}}

  注意,上面的例子中我们使用了notifyAll()唤醒线程而不是notify()唤醒线程,如果我们改用notify()唤醒线程的话,将导致错误的,notify()会在等待队列中随机选择一个线程唤醒,而notifyAll()会唤醒所有等待线程。对于上面的例子,如果现在是非满状态,我们使用notify()唤醒线程,由于只能唤醒一个线程,那么我们唤醒的可能是在等待非空状态的线程,将导致信号丢失。只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:

  1. 所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在从wait返回后将执行相同的操作。
  2. 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。

4、使用显示的Condition实现有界缓存     

      内置条件队列存在一些缺陷,每个内置锁都只能有一个相关联的条件队列,因而像上个例子,多个线程都要在同一个条件队列上等待不同的条件谓词,如果想编写一个带有多个条件谓词的并发对象,就可以使用显示的锁和Condition,与内置锁不同的是,每个显示锁可以有任意数量的Condition对象。以下代码给出了有界缓存的另一种实现,即使用两个Condition,分别为notFull和notEmpty,用于表示"非满"与"非空"两个条件谓词。

public class ConditionBoundedBuffer<T> {	protected final Lock lock = new ReentrantLock();	private final Condition notFull = lock.newCondition();	private final Condition notEmpty = lock.newCondition();	private static final int BUFFER_SIZE = 100;	private final T[] items = (T[]) new Object[BUFFER_SIZE];	private int tail, head, count;	public void put(T x) throws InterruptedException {		lock.lock();		try {			while (count == items.length)				notFull.await();			items[tail] = x;			if (++tail == items.length)				tail = 0;			++count;			notEmpty.signal();		} finally {			lock.unlock();		}	}	public T take() throws InterruptedException {		lock.lock();		try {			while (count == 0)				notEmpty.await();			T x = items[head];			items[head] = null;			if (++head == items.length)				head = 0;			--count;			notFull.signal();			return x;		} finally {			lock.unlock();		}	}}

      注意,在上面的例子中,由于使用了两个Condition对象,我们的唤醒方法调用的是signal()方法,而不是signalAll()方法。

      使用条件队列时,需要特别注意锁、条件谓词和条件变量之间的三元关系:在条件谓词中包含的变量必须由锁保护,在检查条件谓词以及调用wait和notify(或者await和signal)时,必须持有锁对象。

 

Java并发——线程间通信与同步技术