首页 > 代码库 > Transport & Buffer

Transport & Buffer

Transport

  传输API的核心是Channel接口,用于所有的出站操作。

  每个Channel都会分配一个ChannelPipeline和ChannelConfig。ChannelConfig负责设置并存储设置,并允许在运行期间更新它们。ChannelPipeline容纳了使用的ChannelHandler实例,这些ChannelHandler将处理通道传递的“入站”和“出站”数据、ChannelHandler允许你改变数据状态和传输数据。

  ChannelHandler处理以下事情

    传输数据时,将数据从一种格式转换到另一种格式

    异常通知

    Channel变为有效或无效时通知

    Channel被注册或从EventLoop中注销时获得通知

    通知用户特定事件

  ChannelHandler实例添加到ChannelPipeline中,在ChannelPipeline中按顺序逐条执行。它类似于一个链条。

  ChannelPipeline实现了拦截过滤器模式,这意味着我们连接不通的ChannelHandler来拦截并处理经过ChannelPipeline的数据或事件。

  Channel提供如下方法

    eventLoop():返回分配给Channel的EventLoop

    pipeline():返回分配给Channel的ChannelPipeline

    isActive():返回Channel是否已经激活,已激活说明与远程连接对接

    localAddress():返回已绑定的本地SocketAddress

    remoteAddress():返回已绑定的远程的SocketAddress

    write():写数据到远程客户端,数据通过ChannelPipeline传输过去

  Channel是线程安全的,他可以被多个不同的线程安全的操作。

  Netty中的传输方式有如下4种:

    NIO, io.netty.channel.socket.nio —— 基于java.nio.channels工具包。使用选择器作为基础的方法。在高连接数时使用

    OIO, io.netty.channel.socket.oio —— 基于java.net工具包,使用阻塞流。在低连接数,需要低延迟时,阻塞时使用

    Local,io.netty,channel.local —— 用于在虚拟机间进行本地通信。在同一个JVM内通信使用

    Embedded,io.netty.channel.embedded,嵌入传输 —— 允许在没有真正网络的运输中使用ChannelHandler,可以用来测试ChannelHandler的实现。测试ChannelHandler时使用

  

  NIO

    NIO通过使用选择器提供了完全异步地方式操作所有的IO。

    通道状态一般有4种:一个新的Channel被接受并已准备好;Channel连接完成;Channel中有数据并已准备好读取;Channel发送数据出去。

    我们可以注册一个通道或获得某个通道的改变的状态,处理完改变的状态后需要重新设置他们的状态,用一个线程来检查是否有已准备好的Channel,若有则执行相关状态。

    选择器所支持的操作在SelectionKey中定义:

      OP_ACCEPT:有新连接时得到通知

      OP_CONNECT:连接完成后得到通知

      OP_READ:准备好读取数据时得到通知

      OP_WRITE:写入数据到通道时得到通知

    NIO在处理过程中有一定的延迟,若连接数不大,延迟一般在毫秒级。Netty中的NIO传输是“zero-file-copy”。

    Netty本地传输,这个传输实现使用相同的API用于虚拟机之间的通信,传输是完全异步地。每个Channel使用唯一的SocketAddress,客户端通过使用SocketAddress进行连接,在吴福气会被注册为长期运行,一旦通道关闭,它会自动注销。本地传输只能在本地的服务器和客户端上使用它们。Local未绑定任何的Socket,值提供JVM进程间的通信。

    Netty嵌入传输,这个传输允许使用不同的ChannelHandler之间交互。

 

Buffer

  Netty的缓冲API有两个接口:ByteBuf和ByteBufHolder。

  Netty缓冲提供了以下优势:可以自定义缓冲类型;通过一个内置的复合缓冲类型实现零拷贝;扩张性好;不需要调用flip()来切换读/写模式;读取和写入索引分开;方法链;引用计数和Pooling(池)

  ByteBuf

    写入数据到ByteBuf后,写入索引是增加的字节数量。开始读字节后,读取索引增加,知道写入索引和读取索引处理相同的位置,若继续读,则抛出IndexOutOfBoundsException。调用ByteBuf的任何方法开始读/写都会单独维护读索引和写索引。ByteBuf的默认最大容量限制是Integer.MAX_VALUE,写入时若超出这个值将会导致一个异常。

    Netty有3种类型的ByteBuf:Heap Buffer,Direct Buffer和Composite Buffer。

    最常用的类型是ByteBuf将数据存储在JVM的堆空间,这是通过将数据存储在数组实现的。对缓冲区可以快速分配,当不使用时也可以快速释放。还提供了直接访问数据的方法,通过ByteBuf.array()来获取byte[]数据。访问非堆缓冲区ByteBuf的数组会导致UnsupportedOperationException,可以使用gByteBuf.hasArray()来检查是否支持访问数组。

     直接缓冲区在堆之外直接分配内存。直接缓冲区不会占用堆空间容量,使用时应该考虑到应用程序要使用的最大内存容量以及如何限制它。直接缓冲区在使用Socket传递数据时性能很好,若使间接缓冲区,JVM会先将数据复制到直接缓冲区再进行传递。但直接缓冲区的缺点是在分配内存空间和释放内存时比堆缓冲区复杂。Netty使用内存池解决这个问题。直接缓冲区不支持数组访问数据。

ByteBuf directBuf = Unpooled.directBuffer(16);
if(!directBuf.hasArray){
  int len = directBuf.readableBytes();
  byte[] arr = new byte[len];
  directBuf.getBytes(0, arr);
}

    复合缓冲区使我们可以创建多个不同的ByteBuf,然后提供一个ByteBuf组合的视图。复合缓冲区就像一个列表,可以动态添加和删除其中的ByteBuf。

CompositeByteBuf comBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(8);
ByteBuf directBuf = Unpooled.directBuffer(16);
comBuf.addComponents(heapBuf, directBuf);
comBuf.removeComponent(0);
Iterator<ByteBuf> iter = comBuf.iterator();
while(iter.hasNext())
  System.out.println(iter.nex().toString());

if(!comBuf.hasArray()){
  int len = comBuf.readableBytes();
  byte[] arr = new byte[len];
  comBuf.getBytes(0, arr);
}

 

    ByteBuf使用zero-based-indexing(从0开始的索引),第一个字节的索引是0,最后一个字节的索引是ByteBuf的capacity-1。

ByteBuf buf = Unpooled.buffer(16);
for(int i = 0; i < buf.capacity(); i ++){
  byf.writeByte(i+1);
}
// 通过索引访问时不会推进读索引和写索引
for(int i = 0; i < buf.capacity(); i++){ System.out.println(buf.getByte(i)); }

    ByteBuf提供两个指针变量支持读和写操作。读操作使用readIndex(),写操作使用writeIndex()。可以使用ByteBuf.discardReadBytes()来回收已经读取过的字节。discardReadBytes()将丢弃从索引0到readerIndex之间的字节。ByteBuf.discardReadBytes()可以用来清空ByteBuf中已读取的数据,从而使ByteBuf有多余的空间容纳新数据。但是discardReadBytes()涉及内存复制,会影响性能,一般在需要马上释放内存的时候使用收益会比较大。

    任何读操作会增加readerIndex,若读取操作的参数也是一个ByteBuf而没有指定目的索引。指定的目的缓冲区的writeIndex会一直增加。新分配,包装,复制的缓冲区的readerIndex的默认值都是0。

    任何写操作会增加writeIndex。若写操作的参数也是一个ByteBuf并没有指定数据源索引,那么指定缓冲区的readerIndex也会一起增加。若没有足够的可写字节会抛出IndexOutOfBoundException。

    调用ByteBuf.clear()可以设置readerIndex和writeIndex为0.clear不会清楚缓冲区的内容,只是将两个索引值设置为0。

    ByteBufProcessor可以实现搜索。

    每个ByteBuf有两个标注索引,一个存储readerIndex,一个存储writerIndex。可以通过调用readerIndex(int readerIndex)和writeIndex(int writerIndex)移动读索引和写索引到指定位置,调用这两个方法设置指定索引位置时可抛出IndexOutOfBoundException。

    调用duplicate(),slice(),slice(int index, int length),order(ByteOrder endianness)会创建一个现有缓冲区的视图。衍生的缓冲区有独立的readerIndex,writeIndex和标注索引。若需要现有缓冲区的副本,可以使用copy()或copy(int index, int length)获得。

 

  ByteBufHolder

    ByteBufHolder是一个辅助类,是一个接口,其实现类是DefaultByteBufHolder。ByteBufHolder的作用是方便访问ByteBuf中的数据,当缓冲区没用了之后,可以用ByteBufHolder释放资源。若想实现一个“消息对象”有效负载在ByteBuf,可以使用ByteBufHolder。

  ByteBufAllocator  

    ByteBufAllocatior负责分配ByteBuf实例。ByteBufAllocator提供了各种分配不同ByteBuf方法。如ByteBufAllocator.heapBuffer();ByteBufAllocator.directBuffer();ByteBufAllocation.compositeBuffer();Netty有两种不同的ByteBufAllocator实现,一个实现ByteBuf实例池将分配和回收成本以及内存使用降低到最低;另一种实现是每次使用都创建一个新的ByteBuf实例。Netty默认使用PooledByteBufAllocator。可以通过ChannelConfig或引动设置一个不同的实现。

ServerBootstrap b  = new ServerBootstrap();
EventLoopGroup group = new EventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
  .childHandler(new ChannelInitializer<SocketChannel>()){
    proctectd void initChannel(SocketChannel ch) throws Exception{
      ByteBufAllocator alloc0 = ch.alloc();
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
        public void channelActive(ChannelHandlerContext ctx) throws Exception{
          ByteBufAllocator alloc1 = ctx.alloc();
          ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOST);
        }
      });
    }
});

 

  

 

Transport & Buffer