首页 > 代码库 > netty 源码简单分析一

netty 源码简单分析一

周末简单看了下netty5的源码,只看懂了个大概,记录下成果,方便下次再看的时候回忆.

上服务端代码:

public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            new ObjectEncoder(),
                            new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                            new ObjectEchoServerHandler(),
                            new MyServerHandler());
                }
             });

            // Bind and start to accept incoming connections.
            b.bind(port).sync().channel().closeFuture().sync();
            
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
EventLoopGroup类有4个相关类容易搞混: 

EventExecutor EventExecutorGroup, EventLoop, EventLoopGroup,  

   EventExecutor 继承自EventExecutorGroup , EventExecutorGroup 继承自ScheduledExecutorService(java自带的线程池服务类), EventExecutor 是一个特殊的EventExecutorGroup,提供了检测一个线程是否在eventLoop中被执行之类的方法.

    EventLoopGroup 也继承自EventExecutorGroup, 并提供EventLoop的生成方法next(),

    EventLoop继承自EventLoopGroup, EventLoop的英文解释是:Will handle all the I/O-Operations for a Channel once it was registered,即处理channel的io操作,


ServerBootstrap 类主要用于创建NioServerSocketChannel类,并初始化. 主要看它的 bind()方法,如下:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise;
        if (regFuture.isDone()) {
            promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
        }

        return promise;
    }

上面代码主要是两个过程,一个注册生产 channel,第二个是将channel绑定到指定端口,

创建并注册方法如下:

final ChannelFuture initAndRegister() {
        Channel channel;
        try {
            channel = createChannel();
        } catch (Throwable t) {
            return VoidChannel.INSTANCE.newFailedFuture(t);
        }

        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return channel.newFailedFuture(t);
        }

        ChannelPromise regFuture = channel.newPromise();
        channel.<span style="color:#ff0000;">unsafe().register(regFuture);</span>
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

上面标红的代码,注册操作实际由unsafe这个工具类来完成, 主要也是向ServerSocketChannel 这个类进行注册,和java NIO注册类似.源码如下:

 protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

channel 类与 unsafe类区别: channel 类主要给基于netty开发的程序员进行读写操作,但是netty内部跟  NIo的IO操作主要通过unsafe这个类进行.

通过分析channel的类继承关系会发现, channel部分抽象类内部同时也包含相应unsafe类的实现, 下图是NioServerSocketChannel类的继承结构.


AbstractNioMessageChannel 到 AbstractChannel 类内部都有 unsafe内部类作为实际操作IO类.