首页 > 代码库 > Java多线程:CyclicBarrier
Java多线程:CyclicBarrier
CyclicBarrier一个线程同步辅助类,它允许一组线程互相等待,直到线程数达到了CyclicBarrier初始时规定的数目时,才继续运行await方法后的业务;CyclicBarrier和CountDownLacth不同,CyclicBarrier是当所有await的线程数量到达了设定的数量后,才继续往下执行。而CountDownLacth是当所有线程都执行完成时,去执行。
CyclicBarrier和CountDownLacth的区别说起来比较绕, 下图是在网上找的一个解释:
实例:(同事们一起聚餐时, 等所有人全部到达饭店后,才能开吃)
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { public void run() { System.out.println("========全部人员到齐,开吃========"); } }); for (int i = 1; i <= 5; i++) { new Thread("同事" + i){ public void run() { try { Thread.sleep((long)(Math.random()*1000)); System.out.println("第" + (cyclicBarrier.getNumberWaiting() + 1) + "位到达:" + this.getName()); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("========" + this.getName() + "=============="); }; }.start(); }
结果输出:
第1位到达:同事2
第2位到达:同事1
第3位到达:同事5
第4位到达:同事4
第5位到达:同事3
========全部人员到齐,开吃========
========同事2==============
========同事1==============
========同事5==============
========同事4==============
========同事3==============
CyclicBarrier实现:
构造函数:
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
参数parties设置参与者数量,count用于计数,每次都会递减;而成员变量parties的值一直不会变,用于reset。
barrierAction允许传入一个实现了Runnable的对象,当调用await方法使count的数量递减到0时,首先会执行此Runnable的对象。
int await()
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }调用dowait方法时,首先使用ReentrantLock进行加锁,然后对count成员属性的值执行减1操作,如果减后的值为0,则执行传入的barrierAction对象。执行完成后将ranAction设置为true, 调用nextGeneration方法并返回0,nextGeneration方法主要调用trip的signalAll方法唤醒所有等待的线程(trip 为Condition类的对象);
如果count递去1后的值不等于0,则调用trip的await方法或await 设定时间 挂起当前线程,直到被唤醒、线程被interrupt或超过设定时间,才会从等待状态恢复;如果设定了等待时间,则检查是否超时,如果超过了,则将generation#broken的值设置为true,调用trip的signalAll方法,并抛出TimeoutException异常,执行finally中的释放锁。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && !g.broken) { breakBarrier(); throw ie; } else { // We‘re about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
int getNumberWaiting()
返回在屏障处等待的线程数目。
public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
parties、count为初始化的数目,parties不会改变,而count为每次调用await时递减;
int getParties()
返回参与者数量
public int getParties() { return parties; }
void reset()
将屏障重置为其初始状态(即count=parties),如果屏障中还有线程等待,则会抛出BrokenBarrierException异常.
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }