首页 > 代码库 > Java NIO

Java NIO

Java NIO (New IO)是Java 1.4版本开始引入的新的IO API。和IO的区别在于NIO是一个异步、非阻塞的IO,可以用一个线程管理多个连接。比如1个服务器,5个客户端,如果用IO,在服务器上就需要5个线程,而使用NIO的话,只需要1个线程就可以同时管理5个连接,节约了资源。

NIO的核心部分包括:Channel、Buffer和Selector。Channel把数据读写到Buffer,或者说通过Buffer实现数据的读写。Selector可以同时管理多个Channel,Channel注册到Selector,当Channel中有事件就绪时,Selector执行获得一个SelectionKey集合,并进行处理,从而实现了异步、非阻塞,一个线程管理多个连接。

下面的例子,通过NIO实现了1个服务器线程管理5个客户端线程,直接上代码。

NIOServer.java:

package server;import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class NIOServer {        // 端口号    final static int startPort = 10000;    int[] port = {startPort, startPort+1, startPort+2, startPort+3, startPort+4};        // Channel编号    int[] num = {0, 1, 2, 3, 4};        // Channel发送消息的次数    int[] times = {0, 0, 0, 0, 0};         // 缓冲区大小    private int BLOCK = 4096;      // 接收数据缓冲区     private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);      // 发送数据缓冲区     private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);      private Selector selector;         // 构造函数    public NIOServer(String host) throws IOException{        // 打开ServerSocketChannel        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();         ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();         ServerSocketChannel serverSocketChannel3 = ServerSocketChannel.open();         ServerSocketChannel serverSocketChannel4 = ServerSocketChannel.open();         ServerSocketChannel serverSocketChannel5 = ServerSocketChannel.open();         // 设置为非阻塞        serverSocketChannel1.configureBlocking(false);        serverSocketChannel2.configureBlocking(false);        serverSocketChannel3.configureBlocking(false);        serverSocketChannel4.configureBlocking(false);        serverSocketChannel5.configureBlocking(false);        // 获得ServerSocket        ServerSocket serverSocket1 = serverSocketChannel1.socket();        ServerSocket serverSocket2 = serverSocketChannel2.socket();        ServerSocket serverSocket3 = serverSocketChannel3.socket();        ServerSocket serverSocket4 = serverSocketChannel4.socket();        ServerSocket serverSocket5 = serverSocketChannel5.socket();        // 绑定主机和端口        serverSocket1.bind(new InetSocketAddress(host, port[0]));        serverSocket2.bind(new InetSocketAddress(host, port[1]));        serverSocket3.bind(new InetSocketAddress(host, port[2]));        serverSocket4.bind(new InetSocketAddress(host, port[3]));        serverSocket5.bind(new InetSocketAddress(host, port[4]));        // 获得Selector          selector = Selector.open();          // 注册到selector,等待连接          serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT, num[0]);          serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT, num[1]);        serverSocketChannel3.register(selector, SelectionKey.OP_ACCEPT, num[2]);        serverSocketChannel4.register(selector, SelectionKey.OP_ACCEPT, num[3]);        serverSocketChannel5.register(selector, SelectionKey.OP_ACCEPT, num[4]);        // 成功        System.out.println("Server start (Host:" + host + ", Port:" + port[0] + ") successful!");         System.out.println("Server start (Host:" + host + ", Port:" + port[1] + ") successful!");        System.out.println("Server start (Host:" + host + ", Port:" + port[2] + ") successful!");        System.out.println("Server start (Host:" + host + ", Port:" + port[3] + ") successful!");        System.out.println("Server start (Host:" + host + ", Port:" + port[4] + ") successful!");    }        // 监听    public void listen() throws IOException{          while (true) {              // 选择一组键,并且相应的Channel已经打开              selector.select();              // 返回此选择器的SelectionKey集            Set<SelectionKey> selectionKeys = selector.selectedKeys();              Iterator<SelectionKey> iterator = selectionKeys.iterator();              while (iterator.hasNext()) {                          SelectionKey selectionKey = iterator.next();                 // 先删除该SelectionKey,再处理                iterator.remove();                  handleKey(selectionKey);              }          }      }          // 处理请求      private void handleKey(SelectionKey selectionKey) throws IOException {          // 接受请求          ServerSocketChannel server = null;          SocketChannel client = null;          String receiveText;          String sendText;          int count = 0;         int num = 0;        num = (int) selectionKey.attachment();        // 此SelectionKey是否可以Accept          if (selectionKey.isAcceptable()) {              // 获得此SelectionKey的Channel            server = (ServerSocketChannel) selectionKey.channel();             // 接受连接,返回Channel            client = server.accept();              // 配置为非阻塞              client.configureBlocking(false);              // 注册到selector,等待读取             client.register(selector, SelectionKey.OP_READ, num);          }         // 此SelectionKey是否可以Read        else if (selectionKey.isReadable()) {              // 获得此SelectionKey的Channel            client = (SocketChannel) selectionKey.channel();              // 清空缓冲区            receivebuffer.clear();              // 读取客户端发送来的数据到缓冲区中             count = client.read(receivebuffer);               if (count > 0) {                 // Byte转换成String                receiveText = new String(receivebuffer.array(), 0, count);                  System.out.println("Server Channel " + num + " received from client:" + receiveText);                // 注册到selector,等待写入                client.register(selector, SelectionKey.OP_WRITE, num);              }          }         // 此SelectionKey是否可以Write        else if (selectionKey.isWritable()) {              // 清空缓冲区            sendbuffer.clear();              // 获得此SelectionKey的Channel              client = (SocketChannel) selectionKey.channel();              int tim = ++times[num];            sendText = "Server(Number:" + num + " Times:" + tim +")";              // 向缓冲区中输入数据             sendbuffer.put(sendText.getBytes());              // 读写模式切换              sendbuffer.flip();              // 输出到Channel              client.write(sendbuffer);               client.register(selector, SelectionKey.OP_READ, num);          }      }        public static void main(String[] args) throws IOException {        // 启动服务器        NIOServer nioserver = new NIOServer("localhost");        nioserver.listen();    }}

NIOClient.java:

package client;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class NIOClient extends Thread{        // Client编号    int num;    // 端口号    int port;         // 缓冲区大小    private  int BLOCK = 4096;      // 接收数据缓冲区     private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);      // 发送数据缓冲区     private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);      private Selector selector;         Set<SelectionKey> selectionKeys;      Iterator<SelectionKey> iterator;      SelectionKey selectionKey;      SocketChannel client;      String receiveText;      String sendText;      int count = 0;     // 发送消息的次数    int flag = 0;        public NIOClient(int num) throws IOException{        this.num = num;        this.port = 10000 + num;        InetSocketAddress SERVER_ADDRESS = new InetSocketAddress("localhost", port);        // 打开SocketChannel        SocketChannel socketChannel = SocketChannel.open();        // 设置为非阻塞        socketChannel.configureBlocking(false);         // 获得Selector         selector = Selector.open();         // 注册连接服务端socket动作         socketChannel.register(selector, SelectionKey.OP_CONNECT);         // 连接        socketChannel.connect(SERVER_ADDRESS);    }        public void run(){        while (true){            try {                // 选择                selector.select();                selectionKeys = selector.selectedKeys();                iterator = selectionKeys.iterator();                while (iterator.hasNext()) {                      selectionKey = iterator.next();                      if (selectionKey.isConnectable()) {                                                   client = (SocketChannel) selectionKey.channel();                          // 判断此Channel上是否正在进行连接操作                        if (client.isConnectionPending()) {                             // 完成连接                            client.finishConnect();                              System.out.println("Client " + num + " connected!");                               sendbuffer.clear();                              flag++;                            String sendString = "Client " + num + " send: " + flag + "!";                            sendbuffer.put(sendString.getBytes());                              sendbuffer.flip();                              client.write(sendbuffer);                          }                          client.register(selector, SelectionKey.OP_READ);                      }                     else if (selectionKey.isReadable()) {                          client = (SocketChannel) selectionKey.channel();                          //将缓冲区清空以备下次读取                          receivebuffer.clear();                          //读取服务器发送来的数据到缓冲区中                          count=client.read(receivebuffer);                          if(count>0){                              receiveText = new String( receivebuffer.array(),0,count);                              System.out.println("Client " + num + " received from server: " + receiveText);                            client.register(selector, SelectionKey.OP_WRITE);                          }                           }                     else if (selectionKey.isWritable()) {                          sendbuffer.clear();                          client = (SocketChannel) selectionKey.channel();                          flag++;                        String sendString = "Client " + num + " send: " + flag + "!";                         sendbuffer.put(sendString.getBytes());                           // 将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位                          sendbuffer.flip();                          client.write(sendbuffer);                          client.register(selector, SelectionKey.OP_READ);                      }                  }                  selectionKeys.clear();              } catch (IOException e) {                // 出错                e.printStackTrace();            }             try {                Thread.sleep(1000);            } catch (InterruptedException e) {                // 出错                e.printStackTrace();            }        }    }        public static void main(String[] args) throws IOException {        // 启动客户端        NIOClient nioclient0 = new NIOClient(0);        NIOClient nioclient1 = new NIOClient(1);        NIOClient nioclient2 = new NIOClient(2);        NIOClient nioclient3 = new NIOClient(3);        NIOClient nioclient4 = new NIOClient(4);        nioclient0.start();        nioclient1.start();        nioclient2.start();        nioclient3.start();        nioclient4.start();    }}

 

Java NIO