首页 > 代码库 > NIO入门系列之第8章:连网和异步 I/O
NIO入门系列之第8章:连网和异步 I/O
8.1 概述
连网是学习异步 I/O 的很好基础,而异步 I/O 对于在 Java 语言中执行任何输入/输出过程的人来说,无疑都是必须具备的知识。NIO 中的连网与 NIO 中的其他任何操作没有什么不同——它依赖通道和缓冲区,而您通常使用InputStream 和 OutputStream来获得通道。
本节首先介绍异步 I/O 的基础—它是什么以及它不是什么,然后转向更实用的、程序性的例子。
8.2 异步 I/O
异步 I/O 是一种没有阻塞地读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。
另一方面,异步I/O调用不会阻塞。相反,您将注册对特定 I/O 事件的兴趣——可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉您。
异步I/O的一个优势在于,它允许您同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,您可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。
我们将通过研究一个名为MultiPortEcho.java 的例子程序来查看异步 I/O 的实际应用。这个程序就像传统的 echo server,它接受网络连接并向它们回响它们可能发送的数据。不过它有一个附加的特性,就是它能同时监听多个端口,并处理来自所有这些端口的连接。并且它只在单个线程中完成所有这些工作。
// MultiPortEcho import java.io.*; import java.net.*; import java.nio.*; import java.nio.channels.*; import java.util.*; public class MultiPortEcho { private int ports[]; private ByteBuffer echoBuffer = ByteBuffer.allocate( 1024 ); public MultiPortEcho( int ports[] ) throws IOException { this.ports = ports; go(); } private void go() throws IOException { // Create a new selector Selector selector = Selector.open(); // Open a listener on each port, and register each one // with the selector for (int i=0; i<ports.length; ++i) { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking( false ); ServerSocket ss = ssc.socket(); InetSocketAddress address = new InetSocketAddress( ports[i] ); ss.bind( address ); SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT ); System.out.println( "Going to listen on "+ports[i] ); } while (true) { int num = selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey)it.next(); if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking( false ); // Add the new connection to the selector SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ ); it.remove(); System.out.println( "Got connection from "+sc ); } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { // Read the data SocketChannel sc = (SocketChannel)key.channel(); // Echo data int bytesEchoed = 0; while (true) { echoBuffer.clear(); int r = sc.read( echoBuffer ); if (r<=0) { break; } echoBuffer.flip(); sc.write( echoBuffer ); bytesEchoed += r; } System.out.println( "Echoed "+bytesEchoed+" from "+sc ); it.remove(); } } //System.out.println( "going to clear" ); // selectedKeys.clear(); //System.out.println( "cleared" ); } } static public void main( String args[] ) throws Exception { if (args.length<=0) { System.err.println( "Usage: java MultiPortEcho port [port port ...]" ); System.exit( 1 ); } int ports[] = new int[args.length]; for (int i=0; i<args.length; ++i) { ports[i] = Integer.parseInt( args[i] ); } new MultiPortEcho( ports ); } }
8.3 Selectors
本节的阐述对应于MultiPortEcho 的源代码中的 go() 方法的实现,因此应该看一下源代码,以便对所发生的事情有个更全面的了解。
异步 I/O 中的核心对象名为 Selector。Selector 就是您注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。
所以,我们需要做的第一件事就是创建一个 Selector:
Selector selector = Selector.open();
然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register()的第一个参数总是这个Selector。
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
8.4 打开一个 ServerSocketChannel
为了接收连接,我们需要一个ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:
for (int i=0; i<ports.length; ++i) { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking( false ); ServerSocket ss = ssc.socket(); InetSocketAddress address = new InetSocketAddress( ports[i] ); ss.bind( address ); SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT ); System.out.println( "Going to listen on "+ports[i] ); }
第一行创建一个新的ServerSocketChannel ,最后三行将它绑定到给定的端口。第二行将 ServerSocketChannel 设置为非阻塞的。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。
8.5 选择键
下一步是将新打开的ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:
SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
register() 的第一个参数总是这个 Selector。第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。
请注意对register() 的调用的返回值。SelectionKey 代表这个通道在此Selector 上的这个注册。当某个Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。
8.6 内部循环
现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用Selectors 的几乎每个程序都像下面这样使用内部循环:int num = selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey)it.next(); // ... deal with I/O event ... }
首先,我们调用Selector 的 select() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。
接下来,我们调用Selector 的selectedKeys() 方法,它返回发生了事件的SelectionKey 对象的一个集合。
我们通过迭代SelectionKeys 并依次处理每个SelectionKey 来处理事件。对于每一个SelectionKey,您必须确定发生的是什么I/O 事件,以及这个事件影响哪些I/O 对象。
8.7 监听新连接
程序执行到这里,我们仅注册了ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:
if ((key.readyOps() & SelectionKey.OP_ACCEPT)== SelectionKey.OP_ACCEPT) { // Accept the new connection // ... }
可以肯定地说,readOps() 方法告诉我们该事件是新的连接。
8.8 接受新的连接
因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:
ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); SocketChannel sc = ssc.accept();
下一步是将新连接的SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:
sc.configureBlocking( false ); SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
注意我们使用register() 的 OP_READ 参数,将 SocketChannel 注册用于读取而不是接受新连接。
8.9 删除处理过的 SelectionKey
在处理SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey:
it.remove();
现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。
8.10 传入的 I/O
当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用Selector.select(),并返回一个或者多个I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { // Read the data SocketChannel sc = (SocketChannel)key.channel(); // ... }
与以前一样,我们取得发生I/O 事件的通道并处理它。在本例中,由于这是一个 echo server,我们只希望从套接字中读取数据并马上将它发送回去。关于这个过程的细节,请参见参考资料中的源代码 (MultiPortEcho.java)。
8.11 回到主循环
每次返回主循环,我们都要调用select 的 Selector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。
这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,您需要通过将通道从 Selector 中删除来处理关闭的通道。而且您可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。
本文出自 “梦朝思夕” 博客,请务必保留此出处http://qiangmzsx.blog.51cto.com/2052549/1411301