首页 > 代码库 > [编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Unsafe 是channel的内部接口, 负责跟socket底层打交道。从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {    interface Unsafe {        RecvByteBufAllocator.Handle recvBufAllocHandle();        SocketAddress localAddress();        SocketAddress remoteAddress();        void register(EventLoop eventLoop, ChannelPromise promise);        void bind(SocketAddress localAddress, ChannelPromise promise);        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);        void disconnect(ChannelPromise promise);        void close(ChannelPromise promise);        void closeForcibly();        void deregister(ChannelPromise promise);        void beginRead();        void write(Object msg, ChannelPromise promise);        void flush();        ChannelPromise voidPromise();        ChannelOutboundBuffer outboundBuffer();    }    public interface NioUnsafe extends Unsafe {        SelectableChannel ch();        void finishConnect();        void read();        void forceFlush();    }}

 

NioSocketChannelUnsafe 继承关系为: NioSocketChannelUnsafe -> NioByteUnsafe -> AbstractNioUnsafe -> AbstractUnsafe

AbstractUnsafe:负责socket 链路绑定、接受、关闭,数据fush操作

每个操作大概分四个阶段处理

        @Override        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {            assertEventLoop();            //执行前检查            if (!promise.setUncancellable() || !ensureOpen(promise)) {                return;            }            boolean wasActive = isActive();            //调用实现            try {                doBind(localAddress);            } catch (Throwable t) {                safeSetFailure(promise, t);                closeIfClosed();                return;            }            //调用业务,通知pipeline            if (!wasActive && isActive()) {                invokeLater(()-> pipeline.fireChannelActive(););            }            //完成阶段处理            safeSetSuccess(promise);        }

 

 @Override        public final void flush() {            assertEventLoop();            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;            if (outboundBuffer == null) {                return;            }            outboundBuffer.addFlush();            flush0();        }        @SuppressWarnings("deprecation")        protected void flush0() {            //刚完成Flush操作            if (inFlush0) {                 return;            }            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;            if (outboundBuffer == null || outboundBuffer.isEmpty()) {                return;            }            inFlush0 = true;            //发送数据前链路检查            if (!isActive()) {                try {                    if (isOpen()) {                        //true 通知 handler channelWritabilityChanged方法                        outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);                    } else {                        outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);                    }                } finally {                    inFlush0 = false;                }                return;            }            try {                //调用channel实现                doWrite(outboundBuffer);            } catch (Throwable t) {                if (t instanceof IOException && config().isAutoClose()) {                    close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);                } else {                    outboundBuffer.failFlushed(t, true);                }            } finally {                inFlush0 = false;            }        }

 

AbstractNioUnsafe:是NioUnsafe接口模板类,简单的包装

NioByteUnsafe:主要对NioUnsafe接口 read操作实现

NioSocketChannelUnsafe:只是简单的包装,最终公开给内部使用

NioByteUnsafe read方法

      public final void read() {            final ChannelConfig config = config();            final ChannelPipeline pipeline = pipeline();            final ByteBufAllocator allocator = config.getAllocator();            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();            allocHandle.reset(config);            ByteBuf byteBuf = null;            boolean close = false;            try {                do {                    byteBuf = allocHandle.allocate(allocator);                    //填充byteBuf 调用channel实现                    int size = doReadBytes(byteBuf);                    //记录最后读取长度                    allocHandle.lastBytesRead(size);                    //链路关闭,释放byteBuf                    if (allocHandle.lastBytesRead() <= 0) {                        byteBuf.release();                        byteBuf = null;                        close = allocHandle.lastBytesRead() < 0;                        break;                    }                    //自增消息读取处理次数                    allocHandle.incMessagesRead(1);                    //已完成填充byteBuf 调用业务pipeline                    readPending = false;                    pipeline.fireChannelRead(byteBuf);                    byteBuf = null;                } while (allocHandle.continueReading());                allocHandle.readComplete();                pipeline.fireChannelReadComplete();                if (close) {                    closeOnRead(pipeline);                }            } catch (Throwable t) {                handleReadException(pipeline, byteBuf, t, close, allocHandle);            } finally {                //如果不是主动read 要完成后要清理read op                if (!readPending && !config.isAutoRead()) {                    removeReadOp();                }            }        }    }

小结:可以看出没有任何的计算代码,Unsafe只实现边界检查、流程控制,具体实现交给上层处理

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现