首页 > 代码库 > netty4.0.x源码分析—bootstrap

netty4.0.x源码分析—bootstrap

Bootstrap的意思就是引导,辅助的意思,在编写服务端或客户端程序时,我们都需要先new一个bootstrap,然后基于这个bootstrap调用函数,添加eventloop和handler,可见对bootstrap进行分析还是有必要的。

1、bootstrap结构图

bootstrap的结构比较简单,涉及的类和接口很少,如下图所示,其中Bootstrap则是客户端程序用的引导类,ServerBootstrap是服务端程序用的引导类。

技术分享

2、serverbootstrap分析

这部分,专门对serverbootstrap进行分析,bootstrap过程大同小异就不作详细的分析了。下面是我们编写服务端代码的一般化过程,整个分析过程将基于下面这段代码中用到的函数进行。

 

// Configure the bootstrap.        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .childHandler(new HexDumpProxyInitializer(remoteHost, remotePort))             .childOption(ChannelOption.AUTO_READ, false)             .bind(localPort).sync().channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }

 

 

先看关键代码(注意这里面的部分函数是在AbstractBootstrap中定义的)

 

    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();    private volatile EventLoopGroup childGroup;    private volatile ChannelHandler childHandler;    /**     * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These     * {@link EventLoopGroup}‘s are used to handle all the events and IO for {@link SocketChannel} and     * {@link Channel}‘s.     */    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {        super.group(parentGroup);        if (childGroup == null) {            throw new NullPointerException("childGroup");        }        if (this.childGroup != null) {            throw new IllegalStateException("childGroup set already");        }        this.childGroup = childGroup;        return this;    }

属性值ChildGroup,ChildHandler,是用来处理accpt的Channel的。group函数其实就是将parentGroup和ChildGroup进行赋值,其中parentGroup用于处理accept事件,ChildGroup用于处理accpt的Channel的IO事件。

 

 

    //channel函数的实现定义在抽象父类中,其实就是通过newInstance函数生成一个具体的channel对象。<pre name="code" class="java">    /**     * The {@link Class} which is used to create {@link Channel} instances from.     * You either use this or {@link #channelFactory(ChannelFactory)} if your     * {@link Channel} implementation has no no-args constructor.     */    public B channel(Class<? extends C> channelClass) {        if (channelClass == null) {            throw new NullPointerException("channelClass");        }        return channelFactory(new BootstrapChannelFactory<C>(channelClass));    }    /**     * {@link ChannelFactory} which is used to create {@link Channel} instances from     * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}     * is not working for you because of some more complex needs. If your {@link Channel} implementation     * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for     * simplify your code.     */    @SuppressWarnings("unchecked")    public B channelFactory(ChannelFactory<? extends C> channelFactory) {        if (channelFactory == null) {            throw new NullPointerException("channelFactory");        }        if (this.channelFactory != null) {            throw new IllegalStateException("channelFactory set already");        }        this.channelFactory = channelFactory;        return (B) this;    }<pre name="code" class="java">    private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {        private final Class<? extends T> clazz;        BootstrapChannelFactory(Class<? extends T> clazz) {            this.clazz = clazz;        }        @Override        public T newChannel() {            try {                return clazz.newInstance();            } catch (Throwable t) {                throw new ChannelException("Unable to create Channel from class " + clazz, t);            }        }        @Override        public String toString() {            return clazz.getSimpleName() + ".class";        }    }

Channel函数比较简单,其实就是通过newInstance函数,生成一个具体的Channel对象,例如服务端的NioServerSocketChannel。

 

 

    /**     * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}‘s.     */    public ServerBootstrap childHandler(ChannelHandler childHandler) {        if (childHandler == null) {            throw new NullPointerException("childHandler");        }        this.childHandler = childHandler;        return this;    }

上面的函数即给serverbootstrap的childHandler赋值。

 

 

    /**     * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created     * (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set     * {@link ChannelOption}.     */    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {        if (childOption == null) {            throw new NullPointerException("childOption");        }        if (value =http://www.mamicode.com/= null) {>

上面的函数是指定accpt的channel的属性,channel有很多属性,比如SO_TIMEOUT时间,Buf长度等等。

 

 

    /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind() {        validate();        SocketAddress localAddress = this.localAddress;        if (localAddress == null) {            throw new IllegalStateException("localAddress not set");        }        return doBind(localAddress);    }     /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind(int inetPort) {        return bind(new InetSocketAddress(inetPort));    }    /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind(String inetHost, int inetPort) {        return bind(new InetSocketAddress(inetHost, inetPort));    }<pre name="code" class="java">    /**     * Create a new {@link Channel} and bind it.     */    public ChannelFuture bind(SocketAddress localAddress) {        validate();        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        return doBind(localAddress);    }    private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regPromise = initAndRegister();        final Channel channel = regPromise.channel();        final ChannelPromise promise = channel.newPromise();        if (regPromise.isDone()) {            doBind0(regPromise, channel, localAddress, promise);        } else {            regPromise.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    doBind0(future, channel, localAddress, promise);                }            });        }        return promise;    }<pre name="code" class="java">    private static void doBind0(            final ChannelFuture regFuture, final Channel channel,            final SocketAddress localAddress, final ChannelPromise promise) {        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up        // the pipeline in its channelRegistered() implementation.        channel.eventLoop().execute(new Runnable() {            @Override            public void run() {                if (regFuture.isSuccess()) {                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);                } else {                    promise.setFailure(regFuture.cause());                }            }        });    }

Bind函数层层调用过来之后,最后就调用Channel的bind函数了,下面再看channel的bind函数是如何处理的。定义在AbstractChannel中:

 

 

    @Override    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return pipeline.bind(localAddress, promise);    }

channel的bind函数,最终就是调用pipeline的bind,而pipeline的bind实际上就是调用contexthandler的bind,之个之前分析write和flush的时候说过了。所以这里直接看contexthandler的bind函数。下面是定义:

 

 

    @Override    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        validatePromise(promise, false);        final DefaultChannelHandlerContext next = findContextOutbound();        EventExecutor executor = next.executor();        if (executor.inEventLoop()) {            next.invokeBind(localAddress, promise);        } else {            executor.execute(new Runnable() {                @Override                public void run() {                    next.invokeBind(localAddress, promise);                }            });        }        return promise;    }<pre name="code" class="java">    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {        try {            ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);        } catch (Throwable t) {            notifyOutboundHandlerException(t, promise);        }    }

最终调用Handler的bind函数,还记得之前说的outbound类型的事件吗,这类事件提供了默认的实现方法,HeadHandler的bind函数,下面是它的定义:

 

 

        @Override        public void bind(                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)                throws Exception {            unsafe.bind(localAddress, promise);        }

我们又看到了unsafe这个苦力了,最终的操作还是得由它来完成啊,赶紧去看看这个bind函数吧,

        @Override        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {            if (!ensureOpen(promise)) {                return;            }            // See: https://github.com/netty/netty/issues/576            if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&                Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&                localAddress instanceof InetSocketAddress &&                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {                // Warn a user about the fact that a non-root user can‘t receive a                // broadcast packet on *nix if the socket is bound on non-wildcard address.                logger.warn(                        "A non-root user can‘t receive a broadcast packet if the socket " +                        "is not bound to a wildcard address; binding to a non-wildcard " +                        "address (" + localAddress + ") anyway as requested.");            }            boolean wasActive = isActive();            try {                doBind(localAddress);            } catch (Throwable t) {                closeIfClosed();                promise.setFailure(t);                return;            }            if (!wasActive && isActive()) {                invokeLater(new Runnable() {                    @Override                    public void run() {                        pipeline.fireChannelActive();                    }                });            }            promise.setSuccess();        }

上面的代码最终调用了Channel的doBind函数,这里我们的Channel是NioServerSocketChannel,所以最终就是调用它的bind函数了,代码如下

 

 

    @Override    protected void doBind(SocketAddress localAddress) throws Exception {        javaChannel().socket().bind(localAddress, config.getBacklog());    }

其实它最终也是调用了JDK的Channel的socket bind函数。

 

 

看到这里,你是否会觉得有点怪异,为什么没有注册accpt事件啊,一般的我们的server socket都是要注册accpt事件到selector,用于监听连接。如果你发现了这个问题,说明你是理解socket的编程的,^_^。实际上是前面在分析bind的时候我们漏掉了一个重要的函数,initAndRegister,下面再来看看它的定义:

 

    final ChannelFuture initAndRegister() {        final Channel channel = channelFactory().newChannel();        try {            init(channel);        } catch (Throwable t) {            channel.unsafe().closeForcibly();            return channel.newFailedFuture(t);        }        ChannelPromise regPromise = channel.newPromise();        group().register(channel, regPromise);        if (regPromise.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        // If we are here and the promise is not failed, it‘s one of the following cases:        // 1) If we attempted registration from the event loop, the registration has been completed at this point.        //    i.e. It‘s safe to attempt bind() or connect() now beause the channel has been registered.        // 2) If we attempted registration from the other thread, the registration request has been successfully        //    added to the event loop‘s task queue for later execution.        //    i.e. It‘s safe to attempt bind() or connect() now:        //         because bind() or connect() will be executed *after* the scheduled registration task is executed        //         because register(), bind(), and connect() are all bound to the same thread.        return regPromise;    }

在这里,我们看到了我们之前介绍event时说的register函数,它就是用于将Channel注册到eventloop中去的。eventloop经过层层调用,最终调用了SingleThreadEventLoop类中的register函数,下面是它的定义:

    @Override    public ChannelFuture register(final Channel channel, final ChannelPromise promise) {        if (channel == null) {            throw new NullPointerException("channel");        }        if (promise == null) {            throw new NullPointerException("promise");        }        channel.unsafe().register(this, promise);        return promise;    }

还是逃离不了unsafe对象的调用,前面说了那么多的unsafe,这个函数猜都可以猜测出执行过程了,这里就不细细的列举代码了。

 

 

还有一个init函数,这里需要说明一下,代码如下:

 

    @Override    void init(Channel channel) throws Exception {        final Map<ChannelOption<?>, Object> options = options();        synchronized (options) {            channel.config().setOptions(options);        }        final Map<AttributeKey<?>, Object> attrs = attrs();        synchronized (attrs) {            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {                @SuppressWarnings("unchecked")                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();                channel.attr(key).set(e.getValue());            }        }        ChannelPipeline p = channel.pipeline();        if (handler() != null) {            p.addLast(handler());        }        final EventLoopGroup currentChildGroup = childGroup;        final ChannelHandler currentChildHandler = childHandler;        final Entry<ChannelOption<?>, Object>[] currentChildOptions;        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;        synchronized (childOptions) {            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));        }        synchronized (childAttrs) {            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));        }        p.addLast(new ChannelInitializer<Channel>() {            @Override            public void initChannel(Channel ch) throws Exception {                ch.pipeline().addLast(new ServerBootstrapAcceptor(                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));            }        });    }

它就是用来处理channel 的pipeline,并添加一个ServerBootstrapAcceptor的handler,继续看看这个handler的定义,我们就会明白它的意图。

 

 

    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {        private final EventLoopGroup childGroup;        private final ChannelHandler childHandler;        private final Entry<ChannelOption<?>, Object>[] childOptions;        private final Entry<AttributeKey<?>, Object>[] childAttrs;        @SuppressWarnings("unchecked")        ServerBootstrapAcceptor(                EventLoopGroup childGroup, ChannelHandler childHandler,                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {            this.childGroup = childGroup;            this.childHandler = childHandler;            this.childOptions = childOptions;            this.childAttrs = childAttrs;        }        @Override        @SuppressWarnings("unchecked")        public void channelRead(ChannelHandlerContext ctx, Object msg) {            Channel child = (Channel) msg;            child.pipeline().addLast(childHandler);            for (Entry<ChannelOption<?>, Object> e: childOptions) {                try {                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {                        logger.warn("Unknown channel option: " + e);                    }                } catch (Throwable t) {                    logger.warn("Failed to set a channel option: " + child, t);                }            }            for (Entry<AttributeKey<?>, Object> e: childAttrs) {                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());            }            try {                childGroup.register(child);            } catch (Throwable t) {                child.unsafe().closeForcibly();                logger.warn("Failed to register an accepted channel: " + child, t);            }        }        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {            final ChannelConfig config = ctx.channel().config();            if (config.isAutoRead()) {                // stop accept new connections for 1 second to allow the channel to recover                // See https://github.com/netty/netty/issues/1328                config.setAutoRead(false);                ctx.channel().eventLoop().schedule(new Runnable() {                    @Override                    public void run() {                       config.setAutoRead(true);                    }                }, 1, TimeUnit.SECONDS);            }            // still let the exceptionCaught event flow through the pipeline to give the user            // a chance to do something with it            ctx.fireExceptionCaught(cause);        }    }

上面就是这个handler的全部代码,它重写了ChannelRead函数,它的目的其实是想将server端accept的channel注册到ChildGroup的eventloop中,这样就可以理解,服务端代码workerGroup这个eventloop的作用了,它终于在这里体现出了它的作用了。

 

3、总结

这篇文章主要是分析了serverbootstrap的全过程,通过对这个的分析,我们清晰的看到了平时编写socket服务端代码时对bind,register事件,以及accept channel等的处理。

 

http://blog.csdn.net/pingnanlee/article/details/11973769

netty4.0.x源码分析—bootstrap