首页 > 代码库 > Netty In Action中文版 - 第四章:Transports(传输)
Netty In Action中文版 - 第四章:Transports(传输)
本章内容
- Transports(传输)
- NIO(non-blocking IO,New IO), OIO(Old IO,blocking IO), Local(本地), Embedded(嵌入式)
- Use-case(用例)
- APIs(接口)
数据传输的过程不一样取决是使用哪种交通工具,可是传输的方式是一样的:都是以字节码传输。Java开发网络程序数据传输的过程和方式是被抽象了的。我们不须要关注底层接口。仅仅须要使用Java API或其它网络框架如Netty就能达到数据传输的目的。发送数据和接收数据都是字节码。Nothing more,nothing less。
假设你以前使用Java提供的网络接口工作过,你可能已经遇到过想从堵塞传输切换到非堵塞传输的情况,这样的切换是比較困难的。由于堵塞IO和非堵塞IO使用的API有非常大的差异。Netty提供了上层的传输实现接口使得这样的情况变得简单。
我们能够让所写的代码尽可能通用。而不会依赖一些实现相关的APIs。当我们想切换传输方式的时候不须要花非常大的精力和时间来重构代码。
本章将介绍统一的API以及怎样使用它们,会拿Netty的API和Java的API做比較来告诉你为什么Netty能够更easy的使用。本章也提供了一些优质的用例代码,以便最佳使用Netty。
使用Netty不须要其它的网络框架或网络编程经验。若有则仅仅是对理解netty有帮助,但不是必要的。
以下让我们来看看真是世界里的传输工作。
4.1 案例研究:切换传输方式
4.1.1 使用Java的I/O和NIO
package netty.in.action; import java.io.IOException; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.Charset; /** * Blocking networking without Netty * @author c.k * */ public class PlainOioServer { public void server(int port) throws Exception { //bind server to port final ServerSocket socket = new ServerSocket(port); try { while(true){ //accept connection final Socket clientSocket = socket.accept(); System.out.println("Accepted connection from " + clientSocket); //create new thread to handle connection new Thread(new Runnable() { @Override public void run() { OutputStream out; try{ out = clientSocket.getOutputStream(); //write message to connected client out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); out.flush(); //close connection once message written and flushed clientSocket.close(); }catch(IOException e){ try { clientSocket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } }).start();//start thread to begin handling } }catch(Exception e){ e.printStackTrace(); socket.close(); } } }上面的方式非常简洁。可是这样的堵塞模式在大连接数的情况就会有非常严重的问题。如客户端连接超时,服务器响应严重延迟。为了解决这样的情况,我们能够使用异步网络处理全部的并发连接,但问题在于NIO和OIO的API是全然不同的,所以一个用OIO开发的网络应用程序想要使用NIO重构代码差点儿是又一次开发。
以下代码是使用Java NIO实现的样例:
package netty.in.action; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * Asynchronous networking without Netty * @author c.k * */ public class PlainNioServer { public void server(int port) throws Exception { System.out.println("Listening for connections on port " + port); //open Selector that handles channels Selector selector = Selector.open(); //open ServerSocketChannel ServerSocketChannel serverChannel = ServerSocketChannel.open(); //get ServerSocket ServerSocket serverSocket = serverChannel.socket(); //bind server to port serverSocket.bind(new InetSocketAddress(port)); //set to non-blocking serverChannel.configureBlocking(false); //register ServerSocket to selector and specify that it is interested in new accepted clients serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); while (true) { //Wait for new events that are ready for process. This will block until something happens int n = selector.select(); if (n > 0) { //Obtain all SelectionKey instances that received events Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); try { //Check if event was because new client ready to get accepted if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); System.out.println("Accepted connection from " + client); client.configureBlocking(false); //Accept client and register it to selector client.register(selector, SelectionKey.OP_WRITE, msg.duplicate()); } //Check if event was because socket is ready to write data if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buff = (ByteBuffer) key.attachment(); //write data to connected client while (buff.hasRemaining()) { if (client.write(buff) == 0) { break; } } client.close();//close client } } catch (Exception e) { key.cancel(); key.channel().close(); } } } } } }如你所见,即使它们实现的功能是一样,可是代码全然不同。
以下我们将用Netty来实现同样的功能。
4.1.2 Netty中使用I/O和NIO
package netty.in.action; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.oio.OioServerSocketChannel; import io.netty.util.CharsetUtil; public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8)); //事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //用来引导server配置 ServerBootstrap b = new ServerBootstrap(); //使用OIO堵塞模式 b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) //指定ChannelInitializer初始化handlers .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { //加入一个“入站”handler到ChannelPipeline ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //连接后。写消息到client。写完后便关闭连接 ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定server接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (Exception e) { //释放全部资源 group.shutdownGracefully(); } } }上面代码实现功能一样,但结构清晰明了,这仅仅是Netty的优势之中的一个。
4.1.3 Netty中实现异步支持
package netty.in.action; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8)); // 事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { // 用来引导server配置 ServerBootstrap b = new ServerBootstrap(); // 使用NIO异步模式 b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) // 指定ChannelInitializer初始化handlers .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 加入一个“入站”handler到ChannelPipeline ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 连接后。写消息到client,写完后便关闭连接 ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE); } }); } }); // 绑定server接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } catch (Exception e) { // 释放全部资源 group.shutdownGracefully(); } } }由于Netty使用同样的API来实现每一个传输,它并不关心你使用什么来实现。
Netty通过操作Channel接口和ChannelPipeline、ChannelHandler来实现传输。
4.2 Transport API
ChannelPipeline容纳了使用的ChannelHandler实例,这些ChannelHandler将处理通道传递的“入站”和“出站”数据。ChannelHandler的实现同意你改变数据状态和数据传输。本书有章节具体解说ChannelHandler,ChannelHandler是Netty的重点概念。
- 数据传输时。将数据从一种格式转换到还有一种格式
- 异常通知
- Channel变为有效或无效时获得通知
- Channel被注冊或从EventLoop中注销时获得通知
- 通知用户特定事件
你还能够在执行时依据须要加入ChannelHandler实例到ChannelPipeline或从ChannelPipeline中删除,这能帮助我们构建高度灵活的Netty程序。
此外,訪问指定的ChannelPipeline和ChannelConfig,你能在Channel自身上进行操作。Channel提供了非常多方法。例如以下列表:
- eventLoop(),返回分配给Channel的EventLoop
- pipeline()。返回分配给Channel的ChannelPipeline
- isActive(),返回Channel是否激活,已激活说明与远程连接对等
- localAddress(),返回已绑定的本地SocketAddress
- remoteAddress(),返回已绑定的远程SocketAddress
- write()。写数据到远程client,数据通过ChannelPipeline传输过去
Channel channel = ... //Create ByteBuf that holds data to write ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); //Write data ChannelFuture cf = channel.write(buf); //Add ChannelFutureListener to get notified after write completes cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { //Write operation completes without error if (future.isSuccess()) { System.out.println(.Write successful.); } else { //Write operation completed but because of error System.err.println(.Write error.); future.cause().printStacktrace(); } } });Channel是线程安全(thread-safe)的,它能够被多个不同的线程安全的操作,在多线程环境下。全部的方法都是安全的。正由于Channel是安全的,我们存储对Channel的引用,并在学习的时候使用它写入数据到远程已连接的client。使用多线程也是如此。以下的代码是一个简单的多线程样例:
final Channel channel = ... //Create ByteBuf that holds data to write final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8); //Create Runnable which writes data to channel Runnable writer = new Runnable() { @Override public void run() { channel.write(buf.duplicate()); } }; //Obtain reference to the Executor which uses threads to execute tasks Executor executor = Executors.newChachedThreadPool(); // write in one thread //Hand over write task to executor for execution in thread executor.execute(writer); // write in another thread //Hand over another write task to executor for execution in thread executor.execute(writer);此外。这样的方法保证了写入的消息以同样的顺序通过写入它们的方法。想了解全部方法的使用能够參考Netty API文档。
4.3 Netty包括的传输实现
- NIO,io.netty.channel.socket.nio,基于java.nio.channels的工具包,使用选择器作为基础的方法。
- OIO,io.netty.channel.socket.oio,基于java.net的工具包,使用堵塞流。
- Local,io.netty.channel.local,用来在虚拟机之间本地通信。
- Embedded,io.netty.channel.embedded,嵌入传输。它同意在没有真正网络的运输中使用ChannelHandler。能够很实用的来測试ChannelHandler的实现。
4.3.1 NIO - Nonblocking I/O
NIO中,我们能够注冊一个通道或获得某个通道的改变的状态,通道状态有以下几种改变:
- 一个新的Channel被接受并已准备好
- Channel连接完毕
- Channel中有数据并已准备好读取
- Channel发送数据出去
选择器所支持的操作在SelectionKey中定义,详细例如以下:
- OP_ACCEPT,有新连接时得到通知
- OP_CONNECT。连接完毕后得到通知
- OP_READ,准备好读取数据时得到通知
- OP_WRITE。写入数据到通道时得到通知
接下来我们将讨论OIO传输,它是堵塞的。
4.3.2 OIO - Old blocking I/O
以下是OIO的处理流程图,若想具体了解。能够參阅其它相关资料。
4.3.3 Local - In VM transport
Local未绑定不论什么Socket。值提供JVM进程之间的通信。
4.3.4 Embedded transport
4.4 每种传输方式在什么时候使用?
- OIO,在低连接数、须要低延迟时、堵塞时使用
- NIO,在高连接数时使用
- Local,在同一个JVM内通信时使用
- Embedded,測试ChannelHandler时使用
Netty In Action中文版 - 第四章:Transports(传输)