首页 > 代码库 > [编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现

[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现

分析NioEventLoopGroup最主有两个疑问

1.next work如何分配NioEventLoop

2.boss group 与child group 是如何协作运行的

 

从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象

next方法就是用来分配NioEventLoop

public interface EventLoopGroup extends EventExecutorGroup {    @Override    EventLoop next();        ChannelFuture register(Channel channel);    ChannelFuture register(ChannelPromise promise);    @Deprecated    ChannelFuture register(Channel channel, ChannelPromise promise);}

为了节省篇副,做了代码整理

1.NioEventLoopGroup构造时绑定SelectorProvider.provider(),通过newChild生成单个EventLoop

2.next实现是个环形循环

3.register方法是将channel转换成ChannelFuture

读者如果感兴趣可以在这几个方法打上断点看看

public class NioEventLoopGroup extends MultithreadEventLoopGroup {    public NioEventLoopGroup(int nThreads, Executor executor) {        this(nThreads, executor, SelectorProvider.provider());    }    @Override    protected EventLoop newChild(Executor executor, Object... args) throws Exception {        return new NioEventLoop(this, executor, (SelectorProvider) args[0],            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);    }    /////////////////////////////GenericEventExecutorChooser实现next//////////////////////////////////    @Override    public EventExecutor next() {        return executors[Math.abs(idx.getAndIncrement() % executors.length)];    }        /////////////////////////////SingleThreadEventLoop实现register//////////////////////////////////     @Override    public ChannelFuture register(Channel channel) {        return register(new DefaultChannelPromise(channel, this));    }    @Override    public ChannelFuture register(final ChannelPromise promise) {        ObjectUtil.checkNotNull(promise, "promise");        promise.channel().unsafe().register(this, promise);        return promise;    }}

我们用过程的方式来模拟NioEventLoopGroup使用

如果读者有印象netty server 至少有两组NioEventLoopGroup 一个是boss 另一个是child

public class TestBossChildGroup {    static SocketAddress address = new InetSocketAddress("localhost", 8877);    @Test    public void server() throws IOException {        SelectorProvider bossProvider = SelectorProvider.provider();        SelectorProvider childProvider = SelectorProvider.provider();        int count = 2;        AbstractSelector bossSelector = bossProvider.openSelector();        AbstractSelector[] childSelectors = new AbstractSelector[count];        for (int i = 0; i < count; i++) {            childSelectors[i] = childProvider.openSelector();        }        //server绑定访问端口 并向Selector注册OP_ACCEPT        ServerSocketChannel serverSocketChannel = bossProvider.openServerSocketChannel();        serverSocketChannel.configureBlocking(false);        serverSocketChannel.bind(address);        serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);                  int i = 0;        while (true) {            int s = bossSelector.select(300);            if (s > 0) {            Set<SelectionKey> keys = bossSelector.selectedKeys();            Iterator<SelectionKey> it = keys.iterator();            while (it.hasNext()) {                SelectionKey key = it.next();                //为什么不用elseIf 因为 key interestOps 是多重叠状态,一次返回多个操作                if (key.isAcceptable()) {                System.out.println("isAcceptable");                //这里比较巧妙,注册OP_READ交给别一个Selector处理                key.channel().register(childSelectors[i++ % count], SelectionKey.OP_READ);                }                //这部分是child eventLoop处理                if (key.isConnectable()) {                System.out.println("isConnectable");                }                if (key.isWritable()) {                System.out.println("isWritable");                }                if (key.isReadable()) {                System.out.println("isReadable");                }                key.interestOps(~key.interestOps());                it.remove();            }            }        }    }    @Test    public void client() throws IOException {        SocketChannel clientSocketChannel = SelectorProvider.provider().openSocketChannel();        clientSocketChannel.configureBlocking(true);        clientSocketChannel.connect(address);    }}

 

[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现