首页 > 代码库 > [编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug

1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理

2.NioEventLoop是组成 pool EventLoopGroup 基本单元 

总之好多边界判断跟业务经验之类的代码,非常烦碎

 

重要属性

public final class NioEventLoop extends SingleThreadEventLoop {      //绑定 selector     Selector selector;    //优化过的Set集合    private SelectedSelectionKeySet selectedKeys;    //引用全局 SelectorProvider    private final SelectorProvider provider;    ///////////////////////////////////////////    //为true时执行selector.wakeup()    private final AtomicBoolean wakenUp = new AtomicBoolean();    //io任务占时比率     private volatile int ioRatio = 50;    //记录selectionKey撤销次数    private int cancelledKeys;    //处理selector.selectNow() 标志    private boolean needsToSelectAgain;}

 

替换Selector selectedKeySet字段与重构Selector

优化selectedKeySet集合用的是double cache技术,这种技术在图形渲染处理比较多

    //netty 用到反射加 AccessController技术替换掉 Selector selectedKeySet 字段    private Selector openSelector() {        final Selector selector = provider.openSelector();        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {            @Override            public Object run() {                try {                    return Class.forName(                            "sun.nio.ch.SelectorImpl",                            false,                            PlatformDependent.getSystemClassLoader());                } catch (Throwable cause) {                    return cause;                }            }        });        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {            @Override            public Object run() {                //用到反射技术更改 SelectorImpl 字段                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");                selectedKeysField.setAccessible(true);                publicSelectedKeysField.setAccessible(true);                selectedKeysField.set(selector, selectedKeySet);                publicSelectedKeysField.set(selector, selectedKeySet);                return null;            }        });        return selector;    }//重新构建Selector    private void rebuildSelector0() {        final Selector oldSelector = selector;        final Selector newSelector;        if (oldSelector == null) {            return;        }       newSelector = openSelector();        //迁移处理        int nChannels = 0;        for (SelectionKey key: oldSelector.keys()) {            Object a = key.attachment();            try {                //过滤key是否合法 已处理                if (!key.isValid() || key.channel().keyFor(newSelector) != null) {                    continue;                }                int interestOps = key.interestOps();                key.cancel();                SelectionKey newKey = key.channel().register(newSelector, interestOps, a);                if (a instanceof AbstractNioChannel) {                    // channel重新绑定SelectionKey                    ((AbstractNioChannel) a).selectionKey = newKey;                }                nChannels ++;            } catch (Exception e) {                //出错处理 netty认为 socket已关闭                if (a instanceof AbstractNioChannel) {                    AbstractNioChannel ch = (AbstractNioChannel) a;                    ch.unsafe().close(ch.unsafe().voidPromise());                } else {                    @SuppressWarnings("unchecked")                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                    invokeChannelUnregistered(task, key, e);                }            }        }        selector = newSelector;        oldSelector.close();     }

 

double cache 实现

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {        private SelectionKey[] keysA;        private int keysASize;        private SelectionKey[] keysB;        private int keysBSize;        private boolean isA = true;        SelectedSelectionKeySet() {            keysA = new SelectionKey[1024];            keysB = keysA.clone();        }        @Override        public boolean add(SelectionKey o) {            if (o == null) {                return false;            }            //是A开关即处理A            if (isA) {                int size = keysASize;                keysA[size ++] = o;                keysASize = size;                //双倍扩展容量                if (size == keysA.length) {                    doubleCapacityA();                }            } else {                int size = keysBSize;                keysB[size ++] = o;                keysBSize = size;                if (size == keysB.length) {                    doubleCapacityB();                }            }            return true;        }        private void doubleCapacityA() {            SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];            System.arraycopy(keysA, 0, newKeysA, 0, keysASize);            keysA = newKeysA;        }        private void doubleCapacityB() {            SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];            System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);            keysB = newKeysB;        }        //获取keys并切换        SelectionKey[] flip() {            if (isA) {                isA = false;                keysA[keysASize] = null;                keysBSize = 0;                return keysA;            } else {                isA = true;                keysB[keysBSize] = null;                keysASize = 0;                return keysB;            }        }        @Override        public int size() {            return isA?keysASize : keysBSize;        }    }

重载Selector select 逻辑,修复jdk 会产生的 bug

private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;               int selectCnt = 0;            long currentTimeNanos = System.nanoTime();            //通过delayNanos计算出 select结束时间            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);            for (;;) {                //计算出超时并转换成毫秒,再加上延时固定0.5毫秒                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;                if (timeoutMillis <= 0) {                    if (selectCnt == 0) {                        selector.selectNow();                        selectCnt = 1;                    }                    break;                }                //如果有非IO任务,优先等侍selector操作                if (hasTasks() && wakenUp.compareAndSet(false, true)) {                    selector.selectNow();                    selectCnt = 1;                    break;                }                //阻塞当前线程                int selectedKeys = selector.select(timeoutMillis);                selectCnt ++;                //有IO,非IO,计划任务,wakenUp状态认为已完成 select 处理                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                    break;                }                //如果当前线程中断,netty认为关闭了服务,退出处理                if (Thread.interrupted()) {                    selectCnt = 1;                    break;                }                //相当于下面等价,意思是当前时间大于或等于 (selectDeadLineNanos + 0.5毫秒) selectCnt 重置                //currentTimeNanos + (System.nanoTime() -  selectDeadLineNanos - 500000L )   >= currentTimeNanos                //System.nanoTime() -  selectDeadLineNanos - 500000L >= 0                //System.nanoTime() >= selectDeadLineNanos + 500000L                 long time = System.nanoTime();                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                    selectCnt = 1;                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                                        // selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 默认值512,重构selector                    rebuildSelector();                    selector = this.selector;                    selector.selectNow();                    selectCnt = 1;                    break;                }                //刷新当前时间                currentTimeNanos = time;            }       }

 

分发io与非io任务逻辑实现

//这部分做了代码整理    @Override    protected void run() {        for (;;) {            try {                //检查是否有非IO任务同WAKEUP_TASK任务                if(!hasTasks()){                    continue;                }                //有任务就触发重写的 select                 select(wakenUp.getAndSet(false));                if (wakenUp.get()) {                    selector.wakeup();                }                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;//默认值50                               try {                    final long ioStartTime = System.nanoTime();                    //processSelectedKeys();                    //一般会selectedKeys不会为null做了优化处理                    if (selectedKeys != null) {                        processSelectedKeysOptimized(selectedKeys.flip());                    } else {                        processSelectedKeysPlain(selector.selectedKeys());                    }                } finally {                    //当ioRatio等于100时,百分百执行非IO全部任务                    if (ioRatio == 100) {                        runAllTasks();                    }else{                        final long ioTime = System.nanoTime() - ioStartTime;                        //计算时非IO任务超时时间,公式 = 100 - ioRatio 算出非IO比率再跟IO相比 执行过的IO时间 * (非IO:IO)                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                }             } catch (Throwable t) {                //防止过多失败                Thread.sleep(1000);            }                        //处理完任务判断是否结束             try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Throwable t) {                  Thread.sleep(1000);            }        }    }    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {        for (int i = 0;; i ++) {            final SelectionKey k = selectedKeys[i];            if (k == null) {                break;            }            //依赖外部逻辑清理            selectedKeys[i] = null;            final Object a = k.attachment();            //处理SelectedKey            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            //这里用到比较奇怪的处理,应该是个补丁来的。。。             //从资料来源上说:当触发needsToSelectAgain时 channel全是关闭,所以忽略selectedKeys剩余的key,然后再重获取获取selectedKeys            // null out entries in the array to allow to have it GC‘ed once the Channel close            // See https://github.com/netty/netty/issues/2363            if (needsToSelectAgain) {                for (;;) {                    i++;                    if (selectedKeys[i] == null) {                        break;                    }                    selectedKeys[i] = null;                }                selectAgain();                selectedKeys = this.selectedKeys.flip();                i = -1;            }        }    }    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        if (!k.isValid()) {            final EventLoop eventLoop;            try {                eventLoop = ch.eventLoop();            } catch (Throwable ignored) {                return;            }            //这里忽略情况是 在执行 registerd deregistration 时不能关闭,至于前后顺序无需要太多关心,读者可以进去看看            //每个人出现情况不一样,再加上eventLoop不可能为null的,这段代码明显没有经过测试            // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is            // still healthy and should not be closed.            // See https://github.com/netty/netty/issues/5125            if (eventLoop != this || eventLoop == null) {                return;            }            unsafe.close(unsafe.voidPromise());            return;        }        try {            int readyOps = k.readyOps();            // 如果出现OP_CONNECT 状态必须先完成Connect 才能触发 read or wirte 操作            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                //清除SelectionKey.OP_CONNECT状态                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);                unsafe.finishConnect();            }                        //ByteBuffer 发送出去            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                 ch.unsafe().forceFlush();            }            //netty将OP_READ,OP_ACCEPT 状态统一执行read操作,那netty如何区分 read accept的呢,后面才分析            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }        //处理任务,失败策略执行注销处理    private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {        try {            task.channelReady(k.channel(), k);            if (!k.isValid()) {                 task.channelUnregistered(k.channel(), null);            }        } catch (Exception e) {            k.cancel();            task.channelUnregistered(k.channel(), null);        }      }

 

总结:

1.防cpu假死,超过一定时间重建Selector迁移SelectionKey

2.用反射技术替换Selector selectedKeySet字段,Set集合用到double cache技术

3.优先处理io任务,剩下时间处理非IO任务,通过ioRatio占比分配执行时间

4.在分发IO任务时做了大量的优化处理,如线程中断,读写IO、链路建立处理优先级,Selector 重建情况等

5.逻辑有时看起来好怪,再加上解决问题是修修补补的没经过优化代码,甚至作者没有经过测试就合并了,这是开源框架的通病

 

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现