首页 > 代码库 > Java 并发编程(四)常用同步工具类

Java 并发编程(四)常用同步工具类

同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。


闭锁


首先我们来介绍闭锁。

闭锁作用相当于一扇门:在闭锁到达某一状态之前,这扇门一直是关闭的,所有的线程都会在这扇门前等待(阻塞)。只有门打开后,所有的线程才会同时继续运行。

闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如:

1、确保某个计算在其所有资源都被初始化之后才继续执行。二元闭锁(只有两个状态)可以用来表示“资源R已经被初始化”,而所有需要R操作都必须先在这个闭锁上等待。

2、确保某个服务在所有其他服务都已经启动之后才启动。这时就需要多个闭锁。让S在每个闭锁上等待,只有所有的闭锁都打开后才会继续运行。

3、等待直到某个操作的参与者(例如,多玩家游戏中的玩家)都就绪再继续执行。在这种情况下,当所有玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch 是一种灵活的闭锁实现,可以用在上述各种情况中使用。闭锁状态包含一个计数器,初始化为一个正数,表示要等待的事件数量。countDown() 方法会递减计数器,表示等待的事件中发生了一件。await() 方法则阻塞,直到计数器值变为0。

下面,我们使用闭锁来实现在主线程中计算多个子线程运行时间的功能。具体逻辑是使用两个闭锁,“起始门”用来控制子线程同时运行,“结束门”用来标识子线程是否都结束。

package org.bupt.xiaoye;

import java.util.concurrent.CountDownLatch;

public class Test {
	public static int nThread = 5;
	public static void main(String[] args) throws InterruptedException{
		final CountDownLatch startGate = new CountDownLatch(1);
		final CountDownLatch endGate = new CountDownLatch(nThread);
		for(int i =0;i<nThread;i++){
			new Thread(){
				@Override
				public void run(){
					try{
					startGate.await();
					Thread.sleep(300);}
					catch(InterruptedException e){
					}
					finally{
						System.out.println(Thread.currentThread().getName()+"ended");
						endGate.countDown();
					}
				}
			}.start();
		}
		long start = System.nanoTime();
		startGate.countDown();
		endGate.await();
		long end = System.nanoTime();
		System.out.println("Total time :"+(end - start));
	}
}

FutureTask


        FutureTask也可以用作闭锁。它表示一种抽象的可生成结果的计算。是通过 Callable 来实现的,相当于一种可生成结果的 Runnable ,并且可处于以下三种状态:等待运行,正在运行,运行完成。当FutureTask进入完成状态后,它会停留在这个状态上。

        Future.get 用来获取计算结果,如果FutureTask还未运行完成,则会阻塞。FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask 的规范确保了这种传递过程能实现结果的安全发布。

        FutureTask在Executor框架中表示异步任务,还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。

下面我们来构造一个简单的异步任务,来简单示范FutureTask的使用方法。

package org.bupt.xiaoye;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class Preloader {
	private final FutureTask<Integer> future = new FutureTask<Integer>(
			new Callable<Integer>() {
				public Integer call() throws Exception {
					Thread.sleep(3000);
					return 969;
				}
			});

	private final Thread thread = new Thread(future);
	public void start() {
		thread.start();
	}

	public Integer get() throws Exception {
		try{
		return future.get();}
		catch(ExecutionException e){
			Throwable cause = e.getCause();
			throw launderThrowable(cause);
		}
	}
	private static Exception launderThrowable(Throwable cause) {
		if(cause instanceof RuntimeException)return (RuntimeException )cause;
		else if(cause instanceof Error) throw (Error) cause;
		else throw new IllegalStateException("Not Checked",cause);
	}
	public static void main(String[] args) throws Exception{
		Preloader p = new Preloader();
		p.start();
		long start = System.currentTimeMillis();
		System.out.println(p.get());
		System.out.println(System.currentTimeMillis()-start);
	}
}

信号量


        之前讲的闭锁控制访问的时间,而信号量则用来控制访问某个特定资源的操作数量,控制空间。而且闭锁只能够减少,一次性使用,而信号量则申请可释放,可增可减。 计数信号量还可以用来实现某种资源池,或者对容器施加边界。

        Semaphone 管理这一组许可(permit),可通过构造函数指定。同时提供了阻塞方法acquire,用来获取许可。同时提供了release方法表示释放一个许可。

        Semaphone 可以将任何一种容器变为有界阻塞容器,如用于实现资源池。例如数据库连接池。我们可以构造一个固定长度的连接池,使用阻塞方法 acquire和release获取释放连接,而不是获取不到便失败。(当然,一开始设计时就使用BlockingQueue来保存连接池的资源是一种更简单的方法)

例如我们将一个普通set容器变为可阻塞有界。

package org.bupt.xiaoye;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

public class BoundedHashSet<T> {
	private final Set<T> set;
	private Semaphore sem;

	public BoundedHashSet(int bound) {
		if (bound < 1)
			throw new IllegalStateException();
		set = Collections.synchronizedSet(new HashSet<T>());
		sem = new Semaphore(bound);
	}

	public boolean add(T e) throws InterruptedException {
		sem.acquire();
		boolean wasAdded = false;
		try {
			wasAdded = set.add(e);
			return wasAdded;
		} finally {
			if (!wasAdded)
				sem.release();
		}
	}

	public boolean remove(T e) {
		boolean wasRemoved = set.remove(e);
		if (wasRemoved)
			sem.release();
		return wasRemoved;
	}

}


栅栏


        栅栏(Bariier)类似于闭锁,它能阻塞一组线程知道某个事件发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待等待时间,而栅栏用于等待线程。

        CyclicBarrier 可以使一定数量的参与方反复的在栅栏位置汇聚,它在并行迭代算法中非常有用:将一个问题拆成一系列相互独立的子问题。当线程到达栅栏位置时,调用await() 方法,这个方法是阻塞方法,直到所有线程到达了栅栏位置,那么栅栏被打开,此时所有线程被释放,而栅栏将被重置以便下次使用。

        另一种形式的栅栏是Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。例如当一个线程想缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用 Exchanger 来汇合,并将慢的缓冲区与空的缓冲区交换。当两个线程通过 Exchanger 交换对象时,这种交换就把这两个对象安全的发布给另一方。

Exchanger 可能被视为 SynchronousQueue 的双向形式。我们也可以用两个SynchronousQueue来实现 Exchanger的功能。

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
  }




Java 并发编程(四)常用同步工具类