首页 > 代码库 > Java多线程:CyclicBarrier

Java多线程:CyclicBarrier

CyclicBarrier一个线程同步辅助类,它允许一组线程互相等待,直到线程数达到了CyclicBarrier初始时规定的数目时,才继续运行await方法后的业务;CyclicBarrier和CountDownLacth不同,CyclicBarrier是当所有await的线程数量到达了设定的数量后,才继续往下执行。而CountDownLacth是当所有线程都执行完成时,去执行。


CyclicBarrier和CountDownLacth的区别说起来比较绕, 下图是在网上找的一个解释:


实例:(同事们一起聚餐时, 等所有人全部到达饭店后,才能开吃)

    final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
			public void run() {
				System.out.println("========全部人员到齐,开吃========");
			}
		});
		
		for (int i = 1; i <= 5; i++) {
			new Thread("同事" + i){
				public void run() {
					try {
						Thread.sleep((long)(Math.random()*1000));
						System.out.println("第" + (cyclicBarrier.getNumberWaiting() + 1) + "位到达:" + this.getName());
						cyclicBarrier.await();
					} catch (Exception e) {
						e.printStackTrace();
					} 
					System.out.println("========" + this.getName() + "==============");
				};
			}.start();
		}

结果输出:

第1位到达:同事2
第2位到达:同事1
第3位到达:同事5
第4位到达:同事4
第5位到达:同事3
========全部人员到齐,开吃========
========同事2==============
========同事1==============
========同事5==============
========同事4==============
========同事3==============


CyclicBarrier实现:

构造函数:

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

参数parties设置参与者数量,count用于计数,每次都会递减;而成员变量parties的值一直不会变,用于reset。

barrierAction允许传入一个实现了Runnable的对象,当调用await方法使count的数量递减到0时,首先会执行此Runnable的对象。


int await()

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
调用dowait方法时,首先使用ReentrantLock进行加锁,然后对count成员属性的值执行减1操作,如果减后的值为0,则执行传入的barrierAction对象。执行完成后将ranAction设置为true, 调用nextGeneration方法并返回0,nextGeneration方法主要调用trip的signalAll方法唤醒所有等待的线程(trip 为Condition类的对象);

如果count递去1后的值不等于0,则调用trip的await方法或await 设定时间 挂起当前线程,直到被唤醒、线程被interrupt或超过设定时间,才会从等待状态恢复;如果设定了等待时间,则检查是否超时,如果超过了,则将generation#broken的值设置为true,调用trip的signalAll方法,并抛出TimeoutException异常,执行finally中的释放锁。

	private int dowait(boolean timed, long nanos) throws InterruptedException,
			BrokenBarrierException, TimeoutException {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			final Generation g = generation;

			if (g.broken)
				throw new BrokenBarrierException();

			if (Thread.interrupted()) {
				breakBarrier();
				throw new InterruptedException();
			}

			int index = --count;
			if (index == 0) { // tripped
				boolean ranAction = false;
				try {
					final Runnable command = barrierCommand;
					if (command != null)
						command.run();
					ranAction = true;
					nextGeneration();
					return 0;
				} finally {
					if (!ranAction)
						breakBarrier();
				}
			}

			// loop until tripped, broken, interrupted, or timed out
			for (;;) {
				try {
					if (!timed)
						trip.await();
					else if (nanos > 0L)
						nanos = trip.awaitNanos(nanos);
				} catch (InterruptedException ie) {
					if (g == generation && !g.broken) {
						breakBarrier();
						throw ie;
					} else {
						// We‘re about to finish waiting even if we had not
						// been interrupted, so this interrupt is deemed to
						// "belong" to subsequent execution.
						Thread.currentThread().interrupt();
					}
				}

				if (g.broken)
					throw new BrokenBarrierException();

				if (g != generation)
					return index;

				if (timed && nanos <= 0L) {
					breakBarrier();
					throw new TimeoutException();
				}
			}
		} finally {
			lock.unlock();
		}
	}
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

int getNumberWaiting()

返回在屏障处等待的线程数目。

    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

parties、count为初始化的数目,parties不会改变,而count为每次调用await时递减;


int getParties()

返回参与者数量

public int getParties() {
        return parties;
    }

void reset()

将屏障重置为其初始状态(即count=parties),如果屏障中还有线程等待,则会抛出BrokenBarrierException异常.

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
private void breakBarrier() {
        generation.broken = true;
	count = parties;
        trip.signalAll();
    }
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }