首页 > 代码库 > 3--Java NIO基础1

3--Java NIO基础1

一、NIO概述

1. BIO带来的挑战

BIO即堵塞式I/O,数据在写入或读取时都有可能堵塞,一旦有堵塞,线程将失去CPU的使用权,性能较差。

2. NIO工作机制

Java NIO由Channel、Buffer、Selector三个核心组成,NIO框架类结构图如下:

技术分享

其中,Buffer主要负责存取数据,Channel用于数据传输,获取数据,然后流入Buffer;或从Buffer取数据,发送出去。

Selector允许单线程处理多个Channel,如果打开了多个连接(Channel),但每个连接的数据流量很小,使用Selector则很方便。

技术分享

二、Channel

Channel类主要位于java.nio.channels包下,类结构图如下:

技术分享

Channel跟流相似,但流是单向的,而Channel是双向的。Channel总是从Buffer获取数据(写文件)或将数据写入Buffer(读文件时)。常用的Channel如下:

  • FileChannel,从文件中读写数据。
  • DatagramChannel,能通过UDP读写网络中的数据。
  • SocketChannel,能通过TCP读写网络中的数据。
  • ServerSocketChannel,可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

  

技术分享
package com.yyn.nio;import java.io.FileNotFoundException;import java.io.IOException;import java.io.RandomAccessFile;import java.io.UnsupportedEncodingException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class ByteBufferTest {         public static void main(String [] args) throws IOException{        // testRead(); //从文件读取数据,chanel向buffer中写数据         testWrite();     }              //写文件     public static void testWrite() throws IOException{         RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw");         FileChannel fChannel = raFile.getChannel();         String data = "天王盖地虎\n小鸡炖蘑菇\n要从此路过\n就得跳支舞";         byte[] dataByte = data.getBytes("UTF-8");         System.out.println(dataByte.length);         ByteBuffer buf = ByteBuffer.allocate(dataByte.length);          buf.put(dataByte, 0, dataByte.length);         buf.flip();  //切换buffer到读模式         fChannel.write(buf);  //从buffer读取数据到channel         fChannel.force(true); //强制将数据刷新到磁盘,不一定有用         buf.clear();         buf.put(dataByte, 0, 10);         buf.mark();         buf.put(dataByte,10,10);         buf.reset();         buf.put(dataByte, 0, 10);         buf.flip();  //切换buffer到读模式         fChannel.write(buf);  //从buffer读取数据到channel         fChannel.close();         System.out.println("write over!!");                       }         //读文件     public static void testRead() throws IOException {         RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw");         FileChannel fChannel = raFile.getChannel();         ByteBuffer buf = ByteBuffer.allocate(10);         int byteRead = fChannel.read(buf);         StringBuffer sBuffer = new StringBuffer();         while(byteRead != -1){             buf.flip(); //change to read mode             byte [] bs = null;             int limite = buf.limit();             if(buf.hasArray()){                 bs = buf.array();             }             if(bs != null){                 System.out.println("bs length: "+limite);                 sBuffer.append(new String(bs,0,limite ,"UTF-8"));             }                     buf.clear();  // make buffer ready for write,clear all buffer             //buf.compact();  // make buffer ready for write,clear data readed in buffer              byteRead = fChannel.read(buf);         }         fChannel.close();         System.out.println("####:"+sBuffer.toString());     }       }
View Code

 技术分享

技术分享

2.NIO优化方法

2.1 FileChannel.transformXXX方法

技术分享

技术分享

2.2 FileChannel.map方法

技术分享

三、Buffer

Buffer是一片缓冲区,可读可写,非线程安全的,NIO包中针对常用的类型设置了Buffer,类结构图如下:

技术分享

要使用Buffer,需记住3个方法和4个特性

  • flip()方法,切换Buffer为读状态,此时Buffer可读。limit设置为position,position设置为0
  • clear()方法,切换Buffer为写状态,会清空Buffer里所有数据。position为0,limit置为capacity
  • compact()方法,切换Buffer为写状态,清空Buffer里所有已读数据,将未读数据剪切到Buffer前端。position设置为limit,limit设置为capacity

要理解其4个特性,

  • capacity,Buffer的总长度,该值总是保持不变。A buffer‘s capacity is the number of elements it contains. The capacity of a buffer is never negative and never changes
  • position,下一个要操作的数据元素的位置,该值总是小于等于capacity和limit。Buffer为读状态时,表示下一个要读的位置,Buffer为写状态时,表示下一个要写的位置。

                  A buffer‘s position is the index of the next element to be read or written. A buffer‘s position is never negative and is never greater than its limit.

  • limit,Buffer中第一个不可操作元素的位置,limit<=capacity。A buffer‘s limit is the index of the first element that should not be read or written. A buffer‘s limit is never negative and is                 never greater than its capacity.
  • mark,用于记录当前position的前一个位置

Buffer状态转换过程描述

技术分享

从Buffer中读数据方式:buffer.get()方法和channel.write()方法。

向Buffer中写数据方式:buffer.put()方法和channel.read()方法。

技术分享
package com.yyn.nio;import java.io.FileNotFoundException;import java.io.IOException;import java.io.RandomAccessFile;import java.io.UnsupportedEncodingException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class ByteBufferTest {         public static void main(String [] args) throws IOException{        // testRead(); //从文件读取数据,chanel向buffer中写数据         testWrite();     }              //写文件     public static void testWrite() throws IOException{         RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw");         FileChannel fChannel = raFile.getChannel();         String data = "天王盖地虎\n小鸡炖蘑菇\n要从此路过\n就得跳支舞";         byte[] dataByte = data.getBytes("UTF-8");         System.out.println(dataByte.length);         ByteBuffer buf = ByteBuffer.allocate(dataByte.length);          buf.put(dataByte, 0, dataByte.length);         buf.flip();  //切换buffer到读模式         fChannel.write(buf);  //从buffer读取数据到channel         fChannel.force(true); //强制将数据刷新到磁盘,不一定有用         buf.clear();         buf.put(dataByte, 0, 10);         buf.mark();         buf.put(dataByte,10,10);         buf.reset();         buf.put(dataByte, 0, 10);         buf.flip();  //切换buffer到读模式         fChannel.write(buf);  //从buffer读取数据到channel         fChannel.close();         System.out.println("write over!!");                       }         //读文件     public static void testRead() throws IOException {         RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw");         FileChannel fChannel = raFile.getChannel();         ByteBuffer buf = ByteBuffer.allocate(10);         int byteRead = fChannel.read(buf);         StringBuffer sBuffer = new StringBuffer();         while(byteRead != -1){             buf.flip(); //change to read mode             byte [] bs = null;             int limite = buf.limit();             if(buf.hasArray()){                 bs = buf.array();             }             if(bs != null){                 System.out.println("bs length: "+limite);                 sBuffer.append(new String(bs,0,limite ,"UTF-8"));             }                     buf.clear();  // make buffer ready for write,clear all buffer             //buf.compact();  // make buffer ready for write,clear dat a readed in buffer              byteRead = fChannel.read(buf);         }         fChannel.close();         System.out.println("####:"+sBuffer.toString());     }       }
View Code

 

2. Buffer其他方法介绍

2.1 rewind()方法

Buffer.rewind()将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少。

2.2 equals()方法

当满足下列条件时,表示两个Buffer相等:

有相同的类型(byte、char、int等)。
Buffer中剩余的byte、char等的个数相等。
Buffer中所有剩余的byte、char等都相同。
如你所见,equals只是比较Buffer的一部分,不是每一个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。

2.3 compareTo()方法

compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:
第一个不相等的元素小于另一个Buffer中对应的元素 。
所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。

3. Buffer的Scatter/Gather

 scatter(分散)是指从Channel读取数据后,写入到多个Buffer中。

gather(聚集)是指写操作时,从多个Buffer读取数据并写入到一个Channel中。

四、Selector 

Selector在NIO编程中充当一个调度器的角色,轮训在其注册的channel是否ready,若ready则开始执行操作。

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。

但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。

1. Selector介绍

Selector包含3个Set对象来管理SelectionKey对象,分别是以下三种:

技术分享

 

技术分享

使用Selector前,需要确保以下操作已经执行完成:

  • Selector selector = Selector.open(); //调用open方法,获取一个Selector实例。
  • channel.configureBlocking(false); // 设置Channel为非堵塞模式
  • channel.register(selector , SelectionKey.OP_ACCEPT); //将Channel注册到selector中,并设置需监听的事件

 可以监听四种不同类型的事件:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

2. Selector常用方法

2.1 selectXXX()方法

  • int select(),返回就绪channel的个数,会堵塞。
  • int select(long timeout),返回就绪channel个数,堵塞timeout
  • int selectNow(),返回就绪channel个数,不堵塞

注意:每次调用selectXXX方法时,会返回此次就绪数量,例如,有一个channel就绪,则返回1,但未对这个channel的数据进行处理。接下来又有一个channel就绪,调用selectXXX方法还是返回1,但实际上此时有2个channel就绪但未被处理。

2.2 selectedKeys()

一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。
当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

注意:Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。

2.3 wakeUp()

某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

2.4 close()

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

3. 基于NIO的网络Demo

3.1 单线程版,主线程负责处理accept和read

技术分享
package com.yyn.nio.net;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 本例子服务端只处理accept和read事件,单线程版 *  * @author Michael * */public class NIOSingleServer {    private Selector selector = null;    //private ExecutorService pool;    public static Charset charset = Charset.forName("UTF-8");    public NIOSingleServer init(int port) throws IOException {        //pool = Executors.newFixedThreadPool(5);        ServerSocketChannel ssc = ServerSocketChannel.open();        ssc.configureBlocking(false); // 设置为非堵塞模式        ssc.socket().bind(new InetSocketAddress(port));        selector = Selector.open(); // 获取一个selector        ssc.register(selector, SelectionKey.OP_ACCEPT);        return this;    }    public void listen() throws IOException {        System.out.println("Server started.....");        while (true) {            int n = 0;            n = selector.select(); // 获取就绪操作的个数            if(n == 0){                continue;            }            Iterator<SelectionKey> it = selector.selectedKeys().iterator();            while (it.hasNext()) {                SelectionKey key = it.next();                it.remove(); // 每次使用后需要手工移除                SocketChannel channel = null;                if (key.isAcceptable()) {                    try {                        // init函数中注册的是ServerSocketChannel                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();                        // 获取实际的SocketChannel,类似于Socket IO中的Socket                        channel = serverSocketChannel.accept();                        System.out.println("客户端:" + channel.getRemoteAddress() + "已连接");                        channel.configureBlocking(false);                        SelectionKey k = channel.register(selector, SelectionKey.OP_READ); // 注册read监听,监听客户端发过来的数据                        //Worker worker = new Worker(k);                        //k.attach(worker);                    } catch (Exception e) {                        if (channel != null) {                            channel.close();                        }                    }                } else {                    if (key.isReadable()) {                        System.out.println("begin to process read!!!!");                        channel = (SocketChannel) key.channel();                        ByteBuffer buffer = ByteBuffer.allocate(1024);                        buffer.clear(); //切换buffer为写模式                        int len = 0;                        try{                            while ((len = channel.read(buffer)) > 0) {                                buffer.flip(); //切换buffer为read模式                                System.out.println("客户端数据:"+charset.decode(buffer).toString());                                buffer.clear();                            }                            if(len == -1){  // The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream                                 System.out.println("客户端断开");                                channel.close();                                continue;                            }                        }catch(Exception e){                            System.out.println("客户端异常啦");                        }                    }                    if (key.isWritable()) {                    }                }            }        }    }    public static void main(String[] args) throws IOException {        // TODO Auto-generated method stub        NIOSingleServer server = new NIOSingleServer();        server.init(12003).listen();    }}
View Code

 

技术分享
package com.yyn.nio.net;import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStream;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.net.Socket;import java.net.UnknownHostException;public class NIOSingleClient {    public static void main(String[] args) throws UnknownHostException, IOException {        // TODO Auto-generated method stub        Socket socket = new Socket("127.0.0.1", 12003);        OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");        PrintWriter out = new PrintWriter(osw);                InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8");        BufferedReader in = new BufferedReader(isr);        String data = "";        while(true){            data = in.readLine();            data = data.trim().toUpperCase();            if(data.equals("EIXT")){                out.close();                socket.close();                System.exit(0);            }            System.out.println("read data from comsole:" + data);            out.println(data);            out.flush();            System.out.println("sending data to server:" + data);        }            }}
View Code

 

3.2 多线程版,主线程负责accept,子线程负责read

技术分享
package com.yyn.nio.net;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import java.util.logging.LoggingMXBean;/** * 本例子服务端只处理accept和read事件,多线程版 * @author Michael * */public class NIOMultiServer {    private Selector acceptSelector = null;    private Selector readSelector = null;    public static Charset charset = Charset.forName("UTF-8");        public NIOMultiServer init(int port) throws IOException {        //pool = Executors.newFixedThreadPool(5);        ServerSocketChannel ssc = ServerSocketChannel.open();        ssc.configureBlocking(false); // 设置为非堵塞模式        ssc.socket().bind(new InetSocketAddress(port));        acceptSelector = Selector.open(); // 获取一个selector        readSelector = Selector.open();        ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);        return this;    }        public void listen() throws IOException {        System.out.println("Server started.....");        new Worker(this.readSelector).start();         while (true) {            int n = 0;            n = acceptSelector.select();            if(n == 0)                continue;            Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();            while (it.hasNext()) {                SelectionKey key = it.next();                it.remove();                // init函数中注册的是ServerSocketChannel                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();                // 获取实际的SocketChannel,类似于Socket IO中的Socket,必须accept后才有SocketChannel                SocketChannel channel = serverSocketChannel.accept();                channel.configureBlocking(false);                System.out.println("客户端:" + channel.getRemoteAddress() + "已连接");                if(key.isAcceptable()){                    channel.register(this.readSelector, SelectionKey.OP_READ);                }            }        }    }        private static class Worker extends Thread{                private Selector readSelector = null;                public Worker(Selector selector){            this.readSelector = selector;        }                public void run(){            System.out.println("Read thread started....");            while (true) {                int n = 0;                SocketChannel channel = null;                try {                    n= this.readSelector.select(10);                    if(n == 0)                        continue;                    System.out.println("read thread, n is: " + n);                    Iterator<SelectionKey> it = readSelector.selectedKeys().iterator();                    while (it.hasNext()) {                        SelectionKey key = it.next();                        it.remove();                        if(key.isReadable()){                            channel = (SocketChannel) key.channel();                            System.out.println("begin to process read at: " + channel.getRemoteAddress());                            ByteBuffer buffer = ByteBuffer.allocate(1024);                            buffer.clear(); //将buffer切换为写模式                            long len = 0;                            while((len = channel.read(buffer)) > 0){                                buffer.flip();  //将buffer切换为读模式                                System.out.println("客户端数据:"+charset.decode(buffer).toString());                                buffer.clear();                            }                            if(len == -1){                                System.out.println("客户端断开");                                channel.close();                                continue;                            }                        }                    }                                    } catch (IOException e) {                    System.out.println("客户端异常啦");                    try {                        channel.close();                    } catch (IOException e1) {                        // TODO Auto-generated catch block                        System.out.println("关闭channel发生异常");                    }                }                            }        }    }                public static void main(String[] args) throws IOException {        NIOMultiServer server = new NIOMultiServer();        server.init(12003);        server.listen();    }}
View Code

 

技术分享
package com.yyn.nio.net;import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStream;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.net.Socket;import java.net.UnknownHostException;public class NIOSingleClient {    public static void main(String[] args) throws UnknownHostException, IOException {        // TODO Auto-generated method stub        Socket socket = new Socket("127.0.0.1", 12003);        OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");        PrintWriter out = new PrintWriter(osw);                InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8");        BufferedReader in = new BufferedReader(isr);        String data = "";        while(true){            data = in.readLine();            data = data.trim().toUpperCase();            if(data.equals("EIXT")){                out.close();                socket.close();                System.exit(0);            }            System.out.println("read data from comsole:" + data);            out.println(data);            out.flush();            System.out.println("sending data to server:" + data);        }            }}
View Code

 

 

dd

 

3--Java NIO基础1