首页 > 代码库 > Java NIO 读数据处理过程

Java NIO 读数据处理过程

这两天仿hadoop 写java RPC框架,使用PB作为序列号工具,在写读数据的时候遇到一个小坑。之前写过NIO代码,恰好是错误的代码产生正确的逻辑,误以为自己写对了。现在简单整理一下。

 

使用NIO,select()到读事件时,要处理4种情况:

1. channel还有数据,继续读。

2. channel中暂时没数据,但channel还没断开,这是读取到的数据个数为0,结束读,继续到select()处阻塞等待数据。

3. 另一端channel.close()关闭连接,这时候读channel返回的读取数是-1,表示已经到末尾,跟读文件到末尾时是一样的。既然已经结束了,就把对应的SelectionKey给cancel掉,表示selector不再监听这个channel上的读事件。并且关闭连接,本端channel.close()。

4. 另一端被强制关闭,也就是channel没有close()就被强制断开了,这时候本端会抛出一个IOException异常,要处理这个异常。

 

之前对 另一端channel.close()关闭连接 没有细究,不清楚 读channel返回的读取数-1 是什么意思。然后没有cancel对应的SelectionKey,也没关闭连接,结果就是selector.select()一直返回读事件,但是没有数据。

 

直接贴服务器和客户端代码:

Server:

package socket;import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class NIOServer2 {    private void startServer() throws IOException {        Selector selector = Selector.open();                {            ServerSocketChannel ssc = ServerSocketChannel.open();            ssc.configureBlocking(false);            ServerSocket ss = ssc.socket();            InetSocketAddress address = new InetSocketAddress(9000);            ss.bind(address);                        System.out.println("ssc 0 : " + ssc);            System.out.println("ss 0 : " + ss);                        SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);            System.out.println("acceptKey: " + acceptKey);            printKeyInfo(acceptKey);            System.out.println("Going to listen on 9000");        }                while (true) {            System.out.println("===================================\nstart select...");            int num = selector.select();            System.out.println("NIOServer: Number of keys after select operation: " + num);                        Set<SelectionKey> selectionKeys = selector.selectedKeys();            Iterator<SelectionKey> it = selectionKeys.iterator();                        while (it.hasNext()) {                SelectionKey key = it.next();                System.out.println("key: " + key);                printKeyInfo(key);                                it.remove();                                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {                    System.out.println("select ACCEPT");                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();                    SocketChannel sc = ssc.accept();                    sc.configureBlocking(false);                                        System.out.println("ssc 1 : " + ssc);                    System.out.println("sc 1 : " + sc);                                        SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);                    System.out.println("new key:" + newKey);                    printKeyInfo(newKey);                }                else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {//                    System.out.println("select READ");//                    System.out.print("before cancel:");printKeyInfo(key);//                    key.cancel();//                    System.out.println("after cancel:");printKeyInfo(key);                    SocketChannel sc = (SocketChannel) key.channel();                    System.out.println("sc 2 : " + sc);                                        //echo data                    //下面的处理是正确的,count<0则cancel key。count=0则进入下一轮select()阻塞等待数据。//                    try {//                        int count = doRead(key);//                        if (count < 0) {//                            key.cancel();//                            System.out.println("cancel key for < 0");//                            sc.read(ByteBuffer.allocate(2));//                        }//                    } catch(IOException e) {//                        e.printStackTrace();//                        key.cancel();//                        System.out.println("cancel key");//                    }                                        //下面的处理过程是错误的,偶然情况下会出现正确逻辑。在客户端连续写,写完马上关闭连接,这时下面代码能打印出客户端的输出,                    //客户端关闭连接,下面的代码马上爆出异常,是这行代码。java.io.IOException: 您的主机中的软件中止了一个已建立的连接。//                    int nbytes = 0;//                    ByteBuffer echoBuffer = ByteBuffer.allocate(16);//                    while (true) {//                        echoBuffer.clear();//                        int r = sc.read(echoBuffer);//                        System.out.println(new String(echoBuffer.array()));//                        if (r <= 0) break;//                        echoBuffer.flip();//                        sc.write(echoBuffer);//                        nbytes += r;//                    }//                    System.out.println("echoed " + nbytes + " from " + sc);                                        //下面的是处理过程是正确的。正确的做法就是对读取到n,0,-1分别处理,还要对客户端强制关闭的异常做处理                    while (true) {                        ByteBuffer buffer = ByteBuffer.allocate(2);                        buffer.clear();                        int r;                        try {                            r = sc.read(buffer);                            System.out.println("r = " + r);                            System.out.println(new String(buffer.array()));                            if (r < 0) {                                //客户端socket.close()会到这里,读取数r=-1                                key.cancel();                                System.out.println("cancel key for < 0");                                break;                            } else if (r == 0) {                                //客户端socket没有关闭,而channel没有数据,数据数r=0。                                //有时候select()返回了,但channel不一定有数据。可能select()是被其他方法唤醒                                break;                            }                        } catch (IOException e) {                            //客户端强制关闭会来这里报异常                            e.printStackTrace();                            key.cancel();                            System.out.println("cancel key for Exception");                            break;                        }                    }//while                }// if ... else if//                try {//                    Thread.sleep(500);//                } catch (InterruptedException e) {//                    e.printStackTrace();//                }            }//while        }//while    }        private int doRead(SelectionKey key) throws IOException {        SocketChannel channel = (SocketChannel) key.channel();        while (true) {            int count = -1;            ByteBuffer buffer = ByteBuffer.allocate(2);            if (buffer.remaining() > 0) {                count = channel.read(buffer);                System.out.println("count = " + count);                if (count <= 0) return count;            }        }    }        private static void printKeyInfo(SelectionKey sk) {        String s = new String();        s = "Att: " + (sk.attachment() == null ? "no" : "yes");        s += ", Read: " + sk.isReadable();        s += ", Acpt: " + sk.isAcceptable();        s += ", Cnct: " + sk.isConnectable();        s += ", Wrt: " + sk.isWritable();        s += ", Valid: " + sk.isValid();        s += ", interestOps: " + sk.interestOps();        s += ", readyOps: " + sk.readyOps();        System.out.println(s);    }        public static void main(String[] args) {        try {            new NIOServer2().startServer();        } catch (IOException e) {            e.printStackTrace();        }    }}

 

 

Client:

package socket;import java.io.DataOutputStream;import java.io.IOException;import java.net.Socket;import java.net.UnknownHostException;public class SocketClient {    public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {        Socket socket = new Socket("localhost", 9000);        DataOutputStream out = new DataOutputStream(socket.getOutputStream());        byte[] bytes = "fdfd".getBytes();//        System.out.println("send fdfd");        out.write(bytes);        out.flush();        //        Thread.sleep(15*1000);        //        System.out.println("send loll");        out.write("loull".getBytes());        out.flush();        //        Thread.sleep(1*1000);        socket.close();        System.out.println("client socket close");    }}

 

 

浪费了一些时间,一方面因为自己对网络编程不够熟悉,比如不清楚-1什么意思。另一方面Java NIO的API还是略显难用。

 

Java NIO 读数据处理过程