首页 > 代码库 > 《java.util.concurrent 包源码阅读》27 Phaser 第一部分

《java.util.concurrent 包源码阅读》27 Phaser 第一部分

Phaser是JDK7新添加的线程同步辅助类,作用同CyclicBarrier,CountDownLatch类似,但是使用起来更加灵活:

1. Parties是动态的。

2. Phaser支持树状结构,即Phaser可以有一个父Phaser。

 

Phaser的构造函数涉及到两个参数:父Phaser和初始的parties,因此提供了4个构造函数:

public Phaser();public Phaser(int parties);public Phaser(Phaser parent);public Phaser(Phaser parent, int parties);

 

因为Phaser的特色在在于动态的parties,因此首先来看如何动态更新parties是如何实现的。

Phaser提供了两个方法:register和bulkRegister,前者会添加一个需要同步的线程,后者会添加parties个需要同步的线。

    public int register() {        return doRegister(1);    }    // 增加了参数的检查    public int bulkRegister(int parties) {        if (parties < 0)            throw new IllegalArgumentException();        if (parties == 0)            return getPhase();        return doRegister(parties);    }

两个方法都调用了doRegister方法,因此接下来就来看看doRegister方法

在分析doRegister之前先来说说Phaser的成员变量:state,它存储了Phaser的状态信息:

private volatile long state;

1. state的最高位是一个标志位,1表示Phaser的状态为终止,0表示运行状态

2. state的低32位中,低16位表示没有到达的线程数量,高16位表示Parties值

3. state的高32位除了最高位之外的其他31位表示的Phaser的phase,可以理解为第多少次同步(从0开始计算)。

 

介绍完了state,来看方法doRegister

    private int doRegister(int registrations) {        // 把registrations值同时加到parties值和还未达到的线程数量中去        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;        final Phaser parent = this.parent;        int phase;        for (;;) {            long s = state;            int counts = (int)s;            int parties = counts >>> PARTIES_SHIFT;            int unarrived = counts & UNARRIVED_MASK;            // 超过了允许的最大parties            if (registrations > MAX_PARTIES - parties)                throw new IllegalStateException(badRegister(s));            // 最高位为1,表示Phaser的线程同步已经结束            else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)                break;            // Phaser中的parties不是0            else if (counts != EMPTY) {                // 如果当前Phaser没有父Phaser,或者如果有父Phaser,                // 刷新自己的state值,如果新后的state没有变化。                // 这里新子Phaser的原因在于,会出现父Phaser已经进入下一个phase                // 而子Phaser却没有及时进入下一个phase的延迟现象                if (parent == null || reconcileState() == s) {                    // 如果所有线程都到达了,等待Phaser进入下一次同步开始                    if (unarrived == 0)                        root.internalAwaitAdvance(phase, null);                    // 更新state成功,跳出循环完成注册                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,                                                       s, s + adj))                        break;                }            }            // 第一次注册,且不是子Phaser            else if (parent == null) {                // 更新当前Phaser的state值成功则完成注册                long next = ((long)phase << PHASE_SHIFT) | adj;                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))                    break;            }            // 第一次注册到子Phaser            else {                // 锁定当前Phaser对象                synchronized (this) {                    // 再次检查state值,确保没有被更新                    if (state == s) {                        // 注册到父Phaser中去                        parent.doRegister(1);                        do { // 获取当前phase值                            phase = (int)(root.state >>> PHASE_SHIFT);                        } while (!UNSAFE.compareAndSwapLong                                 (this, stateOffset, state,                                  ((long)phase << PHASE_SHIFT) | adj));// 更新当前Phaser的state值                        break;                    }                }            }        }        return phase;    }

看完了注册,那么来看同步操作的arrive,这里也涉及到两个方法:arrive和arriveAndDeregister,前者会等待其他线程的到达,后者则会立刻返回:

    public int arrive() {        return doArrive(false);    }    public int arriveAndDeregister() {        return doArrive(true);    }

两个方法都调用了doArrive方法,区别在于参数一个是false,一个是true。那么来看doArrive:

    private int doArrive(boolean deregister) {        // arrive需要把未到达的线程数减去1,        // deregister为true,需要把parties值也减去1        int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;        final Phaser root = this.root;        for (;;) {            // 如果是有父Phaser,首先刷新自己的state            long s = (root == this) ? state : reconcileState();            int phase = (int)(s >>> PHASE_SHIFT);            int counts = (int)s;            int unarrived = (counts & UNARRIVED_MASK) - 1;            // 最高位为1,表示同步已经结束,返回phase值            if (phase < 0)                return phase;            // 如果parties为0或者在此次arrive之前所有线程到达            else if (counts == EMPTY || unarrived < 0) {                // 对于非子Phaser来说,上述情况的arrive肯定是非法的                // 对于子Phaser首先刷新一下状态再做检查                if (root == this || reconcileState() == s)                    throw new IllegalStateException(badArrive(s));            }            // 正常情况下,首先更新state            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {                // 所有线程都已经到达                if (unarrived == 0) {                    // 计算parties作为下一个phase的未到达的parties                    long n = s & PARTIES_MASK;                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;                    // 调用父Phaser的doArrive                    if (root != this)                        // 如果下一个phase的未到达的parties为0,则需要向                        // 父Phaser取消注册                        return parent.doArrive(nextUnarrived == 0);                    // 正在进入下一个Phase,默认的实现是nextUnarrived为0                    // 表示正在进入下一个Phase,因为下一个phase的parties                    // 为0,需要等待parties不为0                    if (onAdvance(phase, nextUnarrived))                        // 正在等待下一个phase,设置状态为终止                        n |= TERMINATION_BIT;                    else if (nextUnarrived == 0)                        // 下一个phase的parties为0,更新未到达的parties的值                        n |= EMPTY;                    else                        // 更新下一个phase的未到达的parties的值                        n |= nextUnarrived;                    // phase值加1                    n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;                    // 更新state值                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);                    // 唤醒等待的线程                    releaseWaiters(phase);                }                return phase;            }        }    }

关于arrive还有一个方法:arriveAndAwaitAdvance。这个方法会等到下一个phase开始再返回,相等于doArrive方法添加了awaitAdvance方法的功能。基本逻辑和上面说的doArrive方法类似:

    public int arriveAndAwaitAdvance() {        final Phaser root = this.root;        for (;;) {            long s = (root == this) ? state : reconcileState();            int phase = (int)(s >>> PHASE_SHIFT);            int counts = (int)s;            int unarrived = (counts & UNARRIVED_MASK) - 1;            if (phase < 0)                return phase;            else if (counts == EMPTY || unarrived < 0) {                // 对于非子Phaser来说,因为可以等待下一个phase,                // 所以不是非法arrive                if (reconcileState() == s)                    throw new IllegalStateException(badArrive(s));            }            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,                                               s -= ONE_ARRIVAL)) {                // 还有其他线程没有达到,就会等待直到下一个phase开始                if (unarrived != 0)                    return root.internalAwaitAdvance(phase, null);                if (root != this)                    return parent.arriveAndAwaitAdvance();                long n = s & PARTIES_MASK;  // base of next state                int nextUnarrived = (int)n >>> PARTIES_SHIFT;                if (onAdvance(phase, nextUnarrived))                    n |= TERMINATION_BIT;                else if (nextUnarrived == 0)                    n |= EMPTY;                else                    n |= nextUnarrived;                int nextPhase = (phase + 1) & MAX_PHASE;                n |= (long)nextPhase << PHASE_SHIFT;                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))                    return (int)(state >>> PHASE_SHIFT);                releaseWaiters(phase);                return nextPhase;            }        }    }

 

这一部分主要讲了Phaser的动态更新parties以及线程的arrive,下一部分将会分析线程等待的实现。

 

《java.util.concurrent 包源码阅读》27 Phaser 第一部分