首页 > 代码库 > netty 5 alph1源码分析(服务端创建过程)

netty 5 alph1源码分析(服务端创建过程)

参照《Netty系列之Netty 服务端创建》,研究了netty的服务端创建过程。至于netty的优势,可以参照网络其他文章。《Netty系列之Netty 服务端创建》是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货。但抛开技术来说,也存在一些瑕疵。

缺点如下

  1. 代码衔接不连贯,上下不连贯。

  2. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关)

本篇主要内容,参照《Netty系列之Netty 服务端创建》,梳理出自己喜欢的阅读风格。

1.整体逻辑图

整体将服务端创建分为2部分:(1)绑定端口,提供服务过程;(2)轮询网络请求

技术分享


1.1 绑定端口序列图

技术分享

1.2 类图

技术分享

类图仅仅涵盖了绑定过程中比较重要的几个组件

1.3 代码分析

step 2 doBind 绑定本地端口,启动服务

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();//1
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    final ChannelPromise promise;
    if (regFuture.isDone()) {
        promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);//2
    } else {
        // Registration future is almost always fulfilled already, but just in case it‘s not.
        promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                doBind0(regFuture, channel, localAddress, promise);//2
            }
        });
    }

    return promise;
}
主要分为2个处理单元

step3 initAndRegister

final ChannelFuture initAndRegister() {
    Channel channel;
    try {
        channel = createChannel();
    } catch (Throwable t) {
        return VoidChannel.INSTANCE.newFailedFuture(t);
    }

    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        return channel.newFailedFuture(t);
    }
//注册NioServerSocketChannel到Reactor线程的多路复用器上
    ChannelPromise regFuture = channel.newPromise();
    channel.unsafe().register(regFuture);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

createChannel由子类ServerBootstrap实现,创建新的NioServerSocketChannel,并完成Channel初始化,以及注册。

4.ServerBootstrap.createChannel

Channel createChannel() {
    EventLoop eventLoop = group().next();
    return channelFactory().newChannel(eventLoop, childGroup);
}

它有两个参数,参数1是从父类的NIO线程池中顺序获取一个NioEventLoop,它就是服务端用于监听和接收客户端连接的Reactor线程。第二个参数就是所谓的workerGroup线程池,它就是处理IO读写的Reactor线程组


5.ServerBootstrap.init

void init(Channel channel) throws Exception {
//设置Socket参数和NioServerSocketChannel的附加属性
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
//将AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中
    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }

    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
//将用于服务端注册的Handler ServerBootstrapAcceptor添加到ChannelPipeline中
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                    currentChildAttrs));
        }
    });
}

到此处,Netty服务端监听的相关资源已经初始化完毕

6.AbstractChannel.AbstractUnsafe.register

public final void register(final ChannelPromise promise) {
//首先判断是否是NioEventLoop自身发起的操作,如果是,则不存在并发操作,直接执行Channel注册;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {//如果由其它线程发起,则封装成一个Task放入消息队列中异步执行。
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            promise.setFailure(t);
        }
    }
}

7.register0

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!ensureOpen(promise)) {
            return;
        }
        doRegister();
        registered = true;
        promise.setSuccess();
        pipeline.fireChannelRegistered();
        
        if (isActive()) {//完成绑定时,不会调用该代码段
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        if (!promise.tryFailure(t)) {
            logger.warn(
                    "Tried to fail the registration promise, but it is complete already. " +
                            "Swallowing the cause of the registration failure:", t);
        }
    }
}

触发事件

8.doRegister

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
        //将NioServerSocketChannel注册到NioEventLoop的Selector上
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

大伙儿可能会很诧异,应该注册OP_ACCEPT(16)到多路复用器上,怎么注册0呢?0表示只注册,不监听任何网络操作。这样做的原因如下:

  1. 注册方法是多态的,它既可以被NioServerSocketChannel用来监听客户端的连接接入,也可以用来注册SocketChannel,用来监听网络读或者写操作;

  2. 通过SelectionKey的interestOps(int ops)方法可以方便的修改监听操作位。所以,此处注册需要获取SelectionKey并给AbstractNioChannel的成员变量selectionKey赋值。



本文出自 “简单” 博客,请务必保留此出处http://dba10g.blog.51cto.com/764602/1863497

netty 5 alph1源码分析(服务端创建过程)