首页 > 代码库 > Java 并发编程(四)常用同步工具类
Java 并发编程(四)常用同步工具类
同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。
闭锁
首先我们来介绍闭锁。
闭锁作用相当于一扇门:在闭锁到达某一状态之前,这扇门一直是关闭的,所有的线程都会在这扇门前等待(阻塞)。只有门打开后,所有的线程才会同时继续运行。
闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如:
1、确保某个计算在其所有资源都被初始化之后才继续执行。二元闭锁(只有两个状态)可以用来表示“资源R已经被初始化”,而所有需要R操作都必须先在这个闭锁上等待。
2、确保某个服务在所有其他服务都已经启动之后才启动。这时就需要多个闭锁。让S在每个闭锁上等待,只有所有的闭锁都打开后才会继续运行。
3、等待直到某个操作的参与者(例如,多玩家游戏中的玩家)都就绪再继续执行。在这种情况下,当所有玩家都准备就绪时,闭锁将到达结束状态。
CountDownLatch 是一种灵活的闭锁实现,可以用在上述各种情况中使用。闭锁状态包含一个计数器,初始化为一个正数,表示要等待的事件数量。countDown() 方法会递减计数器,表示等待的事件中发生了一件。await() 方法则阻塞,直到计数器值变为0。
下面,我们使用闭锁来实现在主线程中计算多个子线程运行时间的功能。具体逻辑是使用两个闭锁,“起始门”用来控制子线程同时运行,“结束门”用来标识子线程是否都结束。
package org.bupt.xiaoye; import java.util.concurrent.CountDownLatch; public class Test { public static int nThread = 5; public static void main(String[] args) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThread); for(int i =0;i<nThread;i++){ new Thread(){ @Override public void run(){ try{ startGate.await(); Thread.sleep(300);} catch(InterruptedException e){ } finally{ System.out.println(Thread.currentThread().getName()+"ended"); endGate.countDown(); } } }.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); System.out.println("Total time :"+(end - start)); } }
FutureTask
FutureTask也可以用作闭锁。它表示一种抽象的可生成结果的计算。是通过 Callable 来实现的,相当于一种可生成结果的 Runnable ,并且可处于以下三种状态:等待运行,正在运行,运行完成。当FutureTask进入完成状态后,它会停留在这个状态上。
Future.get 用来获取计算结果,如果FutureTask还未运行完成,则会阻塞。FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask 的规范确保了这种传递过程能实现结果的安全发布。
FutureTask在Executor框架中表示异步任务,还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。
下面我们来构造一个简单的异步任务,来简单示范FutureTask的使用方法。
package org.bupt.xiaoye; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class Preloader { private final FutureTask<Integer> future = new FutureTask<Integer>( new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(3000); return 969; } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public Integer get() throws Exception { try{ return future.get();} catch(ExecutionException e){ Throwable cause = e.getCause(); throw launderThrowable(cause); } } private static Exception launderThrowable(Throwable cause) { if(cause instanceof RuntimeException)return (RuntimeException )cause; else if(cause instanceof Error) throw (Error) cause; else throw new IllegalStateException("Not Checked",cause); } public static void main(String[] args) throws Exception{ Preloader p = new Preloader(); p.start(); long start = System.currentTimeMillis(); System.out.println(p.get()); System.out.println(System.currentTimeMillis()-start); } }
信号量
之前讲的闭锁控制访问的时间,而信号量则用来控制访问某个特定资源的操作数量,控制空间。而且闭锁只能够减少,一次性使用,而信号量则申请可释放,可增可减。 计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphone 管理这一组许可(permit),可通过构造函数指定。同时提供了阻塞方法acquire,用来获取许可。同时提供了release方法表示释放一个许可。
Semaphone 可以将任何一种容器变为有界阻塞容器,如用于实现资源池。例如数据库连接池。我们可以构造一个固定长度的连接池,使用阻塞方法 acquire和release获取释放连接,而不是获取不到便失败。(当然,一开始设计时就使用BlockingQueue来保存连接池的资源是一种更简单的方法)
例如我们将一个普通set容器变为可阻塞有界。
package org.bupt.xiaoye; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Semaphore; public class BoundedHashSet<T> { private final Set<T> set; private Semaphore sem; public BoundedHashSet(int bound) { if (bound < 1) throw new IllegalStateException(); set = Collections.synchronizedSet(new HashSet<T>()); sem = new Semaphore(bound); } public boolean add(T e) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(e); return wasAdded; } finally { if (!wasAdded) sem.release(); } } public boolean remove(T e) { boolean wasRemoved = set.remove(e); if (wasRemoved) sem.release(); return wasRemoved; } }
栅栏
栅栏(Bariier)类似于闭锁,它能阻塞一组线程知道某个事件发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待等待时间,而栅栏用于等待线程。
CyclicBarrier 可以使一定数量的参与方反复的在栅栏位置汇聚,它在并行迭代算法中非常有用:将一个问题拆成一系列相互独立的子问题。当线程到达栅栏位置时,调用await() 方法,这个方法是阻塞方法,直到所有线程到达了栅栏位置,那么栅栏被打开,此时所有线程被释放,而栅栏将被重置以便下次使用。
另一种形式的栅栏是Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。例如当一个线程想缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用 Exchanger 来汇合,并将慢的缓冲区与空的缓冲区交换。当两个线程通过 Exchanger 交换对象时,这种交换就把这两个对象安全的发布给另一方。
Exchanger 可能被视为 SynchronousQueue 的双向形式。我们也可以用两个SynchronousQueue来实现 Exchanger的功能。
class FillAndEmpty { Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); DataBuffer initialEmptyBuffer = ... a made-up type DataBuffer initialFullBuffer = ... class FillingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != null) { addToBuffer(currentBuffer); if (currentBuffer.isFull()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ... } } } class EmptyingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialFullBuffer; try { while (currentBuffer != null) { takeFromBuffer(currentBuffer); if (currentBuffer.isEmpty()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ...} } } void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } }
Java 并发编程(四)常用同步工具类