首页 > 代码库 > Unsafe

Unsafe

Unsafe接口实际上是Channel接口的辅助接口,它不应该被用户代码直接调用。实际的I/O读写操作都是由Unsafe接口负责完成的。

Unsafe继承关系类图

技术分享

AbstractUnsafe源码分析

1.register方法

register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegistered方法。如果Channel被激活,则调用DefaultChannelPipeline的fireChannelActive方法。

首先判断当前所在的线程是否是Channel对应的NioEventLoop线程,如果是同一个线程则不存在多线程并发操作问题,直接调用register0进行注册;如果是由用户线程或者其他线程发起的注册操作,则将注册操作封装成Runnable,放到NioEventLoop任务队列中执行。注意:如果直接执行register0方法,会存在多线程并发操作Channel的问题。

register0方法的实现,首先调用ensureOpen方法判断当前Channel是否打开,如果没有打开则无法注册,直接返回。校验通过后调用doRegister方法,它由AbstractNioUnsafe对应的AbstractNioChannel实现,该方法在前面的AbstractNioChannel源码分析中已经介绍过。如果doRegister方法没有抛出异常,则说明Channel注册成功。将ChannelPromise的结果设置为成功,调用ChannelPipeline的fireChannelRegistered方法,判断当前的Channel是否已经被激活,如果已经被激活,则调用ChannelPipeline的fireChannelActive方法。

如果注册过程中发生了异常,则强制关闭连接,将异常堆栈信息设置到ChannelPromise中。

2.bind方法

bind方法主要用于绑定指定的端口,对于服务端,用于绑定监听端口,可以设置backlog参数;对于客户端,主要用于指定客户端Channel的本地绑定Socket地址。

调用doBind方法,对于NioSocketChannel和NioServerSocketChannel有不同的实现,

如果绑定本地端口发生异常,则将异常设置到ChannelPromise中用于通知ChannelFuture,随后调用closeIfClosed方法来关闭Channel。

3.disconnect方法

disconnect用于客户端或者服务端主动关闭连接,

4.close方法

在链路关闭之前需要首先判断是否处于刷新状态,如果处于刷新状态说明还有消息尚未发送出去,需要等到所有消息发送完成再关闭链路,因此,将关闭操作封装成Runnable稍后再执行。

如果链路没有处于刷新状态,需要从closeFuture中判断关闭操作是否完成,如果已经完成,不需要重复关闭链路,设置ChannelPromise的操作结果为成功并返回。

执行关闭操作,将消息发送缓冲数组设置为空,通知JVM进行内存回收。调用抽象方法doClose关闭链路。

如果关闭操作成功,设置ChannelPromise结果为成功。如果操作失败,则设置异常对象到ChannelPromise中。

调用ChannelOutboundBuffer的close方法释放缓冲区的消息,随后构造链路关闭通知Runnable放到NioEventLoop中执行。

最后,调用deregister方法,将Channel从多路复用器上取消注册。

NioEventLoop的cancel方法实际将selectionKey对应的Channel从多路复用器上去注册。

5.write方法

write方法实际上将消息添加到环形发送数组中,并不是真正的写Channel,

如果Channel没有处于激活状态,说明TCP链路还没有真正建立成功,当前Channel存在以下两种状态。

(1)Channel打开,但是TCP链路尚未建立成功:NOT_YET_CONNECTED_EXCEPTION;

(2)Channel已经关闭:CLOSED_CHANNEL_EXCEPTION。

对链路状态进行判断,给ChannelPromise设置对应的异常,然后调用ReferenceCountUtil的release方法释放发送的msg对象。

如果链路状态正常,则将需要发送的msg和promise放入发送缓冲区中(环形数组)。

6.flush方法

flush方法负责将发送缓冲区中待发送的消息全部写入到Channel中,并发送给通信对方。

首先将发送环形数组的unflushed指针修改为tail,标识本次要发送消息的缓冲区范围。然后调用flush0进行发送。

重点分析 doWrite方法,首先计算需要发送的消息个数(unflushed - flush),如果只有 1 个消息需要发送,则调用父类的写操作,我们分析AbstractNioByteChannel的doWrite()方法,因为只有一条消息需要发送,所以直接从ChannelOutboundBuffer中获取当前需要发送的消息,首先,获取需要发送的消息,如果消息为ByteBuf且它分配的是JDK的非堆内存,则直接返回。对返回的消息进行判断,如果为空,说明该消息已经发送完成并被回收,然后执行清空OP_WRITE操作位的clearOpWrite方法,如果需要发送的ByteBuf已经没有可写的字节了,则说明已经发送完成,将该消息从环形队列中删除,然后继续循环,

分析下ChannelOutboundBuffer的remove方法,首先判断环形队列中是否还有需要发送的消息,如果没有,则直接返回。如果非空,则首先获取Entry,然后对其进行资源释放,同时对需要发送的索引flushed进行更新。所有操作执行完之后,调用decrementPendingOutboundBytes减去已经发送的字节数,该方法跟incrementPendingOutboundBytes类似,会进行发送低水位的判断和事件通知。

继续对消息的发送进行分析,首先将半包标识设置为false,从DefaultSocketChannelConfig中获取循环发送的次数,进行循环发送,对发送方法doWriteBytes展开分析,ByteBuf的readBytes()方法的功能是将当前ByteBuf中的可写字节数组写入到指定的Channel中。方法的第一个参数是Channel,此处就是SocketChannel,第二个参数是写入的字节数组长度,它等于ByteBuf的可读字节数,返回值是写入的字节个数。由于我们将SocketChannel设置为异步非阻塞模式,所以写操作不会阻塞。

从写操作中返回,需要对写入的字节数进行判断,如果为0,说明TCP发送缓冲区已满,不能继续再向里面写入消息,因此,将写半包标识设置为true,然后退出循环,执行后续排队的其他任务或者读操作,等待下一次selector的轮询继续触发写操作。

对写入的字节数进行累加,判断当前的ByteBuf中是否还有没有发送的字节,如果没有可发送的字节,则将done设置为true,退出循环。

从循环发送状态退出后,首先根据实际发送的字节数更新发送进度,实际就是发送的字节数和需要发送的字节数的一个比值。执行完进度更新后,判断本轮循环是否将需要发送的消息全部发送完成,如果发送完成则将该消息从循环队列中删除;否则,设置多路复用器的OP_WRITE操作位,用于通知Reactor线程还有半包消息需要继续发送。

AbstractNioUnsafe源码分析 

AbstractNioUnsafe是AbstractUnsafe类的NIO实现,它主要实现了connect、finishConnect等方法。

1.connect方法

首先获取当前的连接状态进行缓存,然后发起连接操作,需要指出的是,SocketChannel执行connect()操作有三种可能的结果。

(1)连接成功,返回true;

(2)暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回false;

(3)连接失败,直接抛出I/O异常。

如果是第(2)种结果,需要将NioSocketChannel中的selectionKey设置为OP_ CONNECT,监听连接应答消息。

异步连接返回之后,需要判断连接结果,如果连接成功,则触发ChannelActive事件,它最终会将NioSocketChannel中的 selectionKey设置为SelectionKey.OP_READ,用于监听网络读操作位。如果没有立即连接上服务端,则执行其他分支的操作,有两个目的。

(1)根据连接超时时间设置定时任务,超时时间到之后触发校验,如果发现连接并没有完成,则关闭连接句柄,释放资源,设置异常堆栈并发起去注册。

(2)设置连接结果监听器,如果接收到连接完成通知则判断连接是否被取消,如果被取消则关闭连接句柄,释放资源,发起取消注册操作。 

2.finishConnect方法

客户端接收到服务端的TCP握手应答消息,通过SocketChannel的finishConnect方法对连接结果进行判断,首先缓存连接状态,当前返回false,然后执行doFinishConnect方法判断连接结果,通过 SocketChannel的finishConnect 方法判断连接结果,执行该方法返回三种可能结果。

(1)连接成功返回true;

(2)连接失败返回 false;

(3)发生链路被关闭、链路中断等异常,连接失败。

只要连接失败,就抛出 Error(),由调用方执行句柄关闭等资源释放操作,如果返回成功,则执行fulfillConnectPromise 方法,它负责将SocketChannel修改为监听读操作位,用来监听网络的读事件,最后对连接超时进行判断:如果连接超时时仍然没有接收到服务端的 ACK 应答消息,则由定时任务关闭客户端连接,将SocketChannel从Reactor线程的多路复用器上摘除,释放资源。

NioByteUnsafe源码分析

重点分析它的read方法,首先,获取NioSocketChannel的SocketChannelConfig,它主要用于设置客户端连接的TCP参数,继续看allocHandle的初始化。如果是首次调用,从SocketChannelConfig的RecvByteBufAllocator中创建Handle。

RecvByteBufAllocator默认有两种实现,分别是AdaptiveRecvByteBufAllocator和FixedRecvByteBufAllocator。由于FixedRecvByteBufAllocator 的实现比较简单,重点分析AdaptiveRecvByteBufAllocator的实现。 

AdaptiveRecvByteBufAllocator指的是缓冲区大小可以动态调整的ByteBuf分配器。它的成员变量分别定义了三个系统默认值:最小缓冲区长度64字节、初始容量1024字节、最大容量65536字节。还定义了两个动态调整容量时的步进参数:扩张的步进索引为4、收缩的步进索引为1。最后,定义了长度的向量表SIZE_TABLE并初始化它,向量数组的每个值都对应一个Buffer容量,当容量小于512的时候,由于缓冲区已经比较小,需要降低步进值,容量每次下调的幅度要小些;当大于512时,说明需要解码的消息码流比较大,这时采用调大步进幅度的方式减少动态扩张的频率,所以它采用512的倍数进行扩张。

重点分析下AdaptiveRecvByteBufAllocator的方法。

方法1:getSizeTableIndex(final int size)

根据容量Size查找容量向量表对应的索引——这是个典型的二分查找法。

下面我们分析下它的内部静态类HandleImpl,首先,还是看下它的成员变量,

它有5个成员变量,分别是:对应向量表的最小索引、最大索引、当前索引、下一次预分配的Buffer大小和是否立即执行容量收缩操作。

重点分析它的record(int actualReadBytes)方法:当NioSocketChannel执行完读操作后,会计算获得本次轮询读取的总字节数,它就是参数actualReadBytes,执行record方法,根据实际读取的字节数对ByteBuf进行动态伸缩和扩张。

首先,对当前索引做步进缩减,然后获取收缩后索引对应的容量,与实际读取的字节数进行比对,如果发现小于收缩后的容量,则重新对当前索引进行赋值,取收缩后的索引和最小索引中的较大者作为最新的索引。然后,为下一次缓冲区容量分配赋值——新的索引对应容量向量表中的容量。相反,如果当前实际读取的字节数大于之前预分配的初始容量,则说明实际分配的容量不足,需要动态扩张。重新计算索引,选取当前索引+扩张步进和最大索引中的较小作为当前索引值,然后对下次缓冲区的容量值进行重新分配,完成缓冲区容量的动态扩张。

AdaptiveRecvByteBufAllocator就是根据本次读取的实际字节数对下次接收缓冲区的容量进行动态调整。使用动态缓冲区分配器的优点如下。

(1)Netty作为一个通用的NIO框架,并不对用户的应用场景进行假设,可以使用它做流媒体传输,也可以用它做聊天工具。不同的应用场景,传输的码流大小千差万别,无论初始化分配的是32K还是1M,都会随着应用场景的变化而变得不适应。因此,Netty根据上次实际读取的码流大小对下次的接收Buffer缓冲区进行预测和调整,能够最大限度的满足不同行业的应用场景。

(2)性能更高,容量过大会导致内存占用开销增加,后续的Buffer处理性能会下降;容量过小时需要频繁地内存扩张来接收大的请求消息,同样会导致性能下降。

(3)更节约内存。假如通常情况下请求消息平均值为1M左右,接收缓冲区大小为1.2M;突然某个客户发送了一个10M的流媒体附件,接收缓冲区扩张为10M以接纳该附件,如果缓冲区不能收缩,每次缓冲区创建都会分配10M的内存,但是后续所有的消息都是1M左右,这样会导致内存的浪费,如果并发客户端过多,可能会发生内存溢出,最终宕机。

 

Unsafe