首页 > 代码库 > Java - 网络I/O的阻塞

Java - 网络I/O的阻塞

最近学习时碰到事件驱动和非阻塞的相关知识,
随之想到了Java中的Reactor、io与nio的一些东西;
在前辈的博客上翻了翻、复习复习,在此记录一番。


实在找不到比较大点的东西,于是随便弄了个压缩包,大小在1G左右;
写个程序模拟一下下载,开两个客户端线程请求下载;
结果会是:一个请求会一直阻塞,直到一个文件下载完成后另一个文件才开始下载。
先看看服务端的代码:

class DownLoadServer implements Runnable {    @Override    public void run() {        try {            @SuppressWarnings("resource")            final ServerSocket ss = new ServerSocket(8989);            while (true) {                Socket server = ss.accept();                byte[] bfile = new byte[1024];                try {                    FileInputStream fis = new FileInputStream("D:/doc_backup.rar");                    OutputStream os = server.getOutputStream();                    while (fis.read(bfile) > -1) {                        os.write(bfile);                    }                    fis.close();                    server.close();                } catch (IOException e) {                    System.out.println("server线程输出流我的天");                }            }        } catch (Exception e) {            System.out.println("server线程 我的天~");        }    }}

 


很简单,就是accept后开个inputStream和outputStream,边读边写。

接着再看看客户端的代码:

class DownlLoadClient implements Runnable {    @SuppressWarnings("resource")    @Override    public void run() {        try {            Socket client = new Socket("127.0.0.1", 8989);            InputStream is = client.getInputStream();            FileOutputStream fos = new FileOutputStream(                    "E:/testfolder/langchao" + Thread.currentThread().getId()                            + ".txt");            byte[] fromServer = new byte[1024];            while (is.read(fromServer) > -1) {                fos.write(fromServer);            }            client.close();        } catch (IOException e) {            System.out.println("client线程我的天~");        }    }}

 


输出的文件名是随便取的,也没什么特别,只是把读过来的输出去。
结果当然是这个样子的:


服务端只有一对inputStream和outputStream对象在受理请求,前面的没写完后面的别想写。
那如果有很多inputStream和outputStream对象受理请求呢?
想法不错,也就是说把服务端代码改成这样子:

class DownLoadServer implements Runnable {    @Override    public void run() {        try {            @SuppressWarnings("resource")            final ServerSocket ss = new ServerSocket(8989);            while (true) {                final Socket server = ss.accept();                Thread t = new Thread() {                    @Override                    public void run() {                        super.run();                        byte[] bfile = new byte[1024];                        try {                            FileInputStream fis = new FileInputStream("D:/doc_backup.rar");                            OutputStream os = server.getOutputStream();                            while (fis.read(bfile) > -1) {                                os.write(bfile);                            }                            fis.close();                            server.close();                        } catch (IOException e) {                            System.out.println("server线程输出流我的天");                        }                    }                };                t.start();            }        } catch (Exception e) {            System.out.println("server线程 我的天~");        }    }}

 


大概就是这个意思,每accept到就为客户端提供"一对一特殊服务";
嗯,或者也可以算一下获取了多少下载请求,每N次请求开1次"特殊服务"。
但无论如何都无法回避一个问题——"特殊服务"的成本很高,线程的切换和线程的资源都是开销。
如果继续按照这个方法做下去,也只能是弄个Thread Pool。
但如果请求数量超过了pool的maxActive数量,那问题又饶了一圈回来了。


我们追求低成本高效率,于是早在JDK1.4就有了java.nio;
nio怎么讲?有说是new io的、也有叫native io,或许叫non-block io...
概念上也就是channel、buffer、selector、selectionKey...
先看一下server代码:

System.out.println("server start...");ServerSocketChannel serverChannel = ServerSocketChannel.open();serverChannel.bind(new InetSocketAddress(8989));serverChannel.configureBlocking(false);Selector sel = Selector.open();serverChannel.register(sel, SelectionKey.OP_ACCEPT);File file = new File("D:/doc_backup.rar");ByteBuffer buffer = ByteBuffer.allocate(100*1024);CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();while(true){    sel.select();    Iterator<SelectionKey> selKeyItr = sel.selectedKeys().iterator();    while(selKeyItr.hasNext()){        SelectionKey key = selKeyItr.next();        selKeyItr.remove();                                                               String outputFilePath=StringUtils.EMPTY;        if(key.isAcceptable()){            System.out.println("server acceptable");            SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();            channel.configureBlocking(false);                                                                        channel.register(sel, SelectionKey.OP_READ);        }else if(key.isReadable()){            System.out.println("server readable");            SocketChannel channel = (SocketChannel) key.channel();            channel.configureBlocking(false);            channel.read(buffer);            buffer.flip();            CharBuffer clientBuffer = decoder.decode(buffer);            outputFilePath = clientBuffer.toString();            buffer.clear();            SelectionKey writeKey = channel.register(sel, SelectionKey.OP_WRITE);        }else if(key.isWritable()){                                                                   System.out.println("server writable");            SocketChannel channel =(SocketChannel) key.channel();            FileChannel fileChannel = new FileInputStream(file).getChannel();            ByteBuffer fileByte = ByteBuffer.allocate(1024*100);            while(fileChannel.read(fileByte)!=-1){                fileByte.flip();                channel.write(fileByte);                fileByte.clear();            }            channel.register(sel, SelectionKey.OP_READ);        }    }}

 

代码贴出来有点乱,但也就开一个线程,监听与注册事件。
select()方法必须,不然client的send根本recv不到。
socketChannel将blocking设置为false,不然会在事件注册时出现java.nio.channels.IllegalBlockingModeException
Unchecked exception thrown when a blocking-mode-specific operation is invoked upon a channel in the incorrect blocking mode.
同样地,在write事件中把blocking设置为true或者使用阻塞的面向流的IO也会出现同样的异常。


client继承Thread,run method如下:

public void run() {    try {        System.out.println("client...");        SocketAddress addr = new InetSocketAddress(8989);        SocketChannel client = SocketChannel.open();        client.configureBlocking(false);        Selector sel = Selector.open();        client.register(sel, SelectionKey.OP_CONNECT);        CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();                                                                                                                                    client.connect(addr);        while (true) {            sel.select();            Iterator<SelectionKey> selKeyItr = sel.selectedKeys().iterator();            while (selKeyItr.hasNext()) {                SelectionKey key = selKeyItr.next();                selKeyItr.remove();                if (key.isConnectable()) {                    System.out.println("client connectble");                    SocketChannel channel = (SocketChannel) key.channel();                    String filePath = "E:/testfolder/channelTest"+Thread.currentThread().getId()+".rar";                    channel.finishConnect();                    channel.write(encoder.encode(CharBuffer.wrap(filePath)));                    channel.register(sel, SelectionKey.OP_READ).attach(filePath);                                                                                                                                                  } else if (key.isReadable()) {                    System.out.println("client readble...");                                      SocketChannel channel = (SocketChannel) key.channel();                                              if(key.attachment()!=null){                        @SuppressWarnings("resource")                        FileChannel fc = new FileOutputStream(key.attachment().toString()).getChannel();                        ByteBuffer fileByte = ByteBuffer.allocate(1024*100);                        while(channel.read(fileByte)!=-1){                            fileByte.flip();                            fc.write(fileByte);                            fileByte.clear();                        }                    }                                                                                                                                                               channel.register(sel, SelectionKey.OP_CONNECT);                }            }        }    } catch (IOException e) {        e.printStackTrace();    }}

 

在上面代码中的attach()并没有发挥太大用处,attach()可以为selectionKey对象添加任何一个object。
但仅限一个,若没添加,attachment()会取出null。


运行后发现事件都获取到了,但文件仍然是一个接一个的下载。
原因是server触发write事件后创建fileChannel并一次写完。
事件响应的执行体太大,影响后面的执行。
非阻塞嘛,要得就是立即返回。
解决方法是分多次事件去读写,每次事件继续读写上一次事件的缓冲。
我可以好好使用一下这个attach()了。
首先我加了一个resolver类,我打算把他的实例加到attachment中去:

class ChannelResolver{    private FileChannel channel;    private ByteBuffer buffer;    private FileInputStream fis;                                                                                                                                                                                                                                                                                                    public ChannelResolver(String filePath){        try {            this.fis = new FileInputStream(filePath);            this.channel = this.fis.getChannel();            buffer = ByteBuffer.allocate(1024*100);        } catch (FileNotFoundException e) {            e.printStackTrace();        }    }                                                                                                                                                                                                                                                                                                    ByteBuffer readInto(){        try {            buffer.clear();            int i = channel.read(buffer);            buffer.flip();            if(i<0){                return null;            }        } catch (IOException e) {            e.printStackTrace();        }        return buffer;    }}

 

将channel注册write事件后在return的selectionKey上attach一个实例。
然后在write事件中获取attachment进行读写:

public void run() {    System.out.println("server start...");    ServerSocketChannel serverChannel;    try {        serverChannel = ServerSocketChannel.open();        serverChannel.bind(new InetSocketAddress(8989));        serverChannel.configureBlocking(false);        Selector sel = Selector.open();        serverChannel.register(sel, SelectionKey.OP_ACCEPT);        ByteBuffer buffer = ByteBuffer.allocate(100*1024);        CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();        while(true){            sel.select();            Iterator<SelectionKey> selKeyItr = sel.selectedKeys().iterator();            while(selKeyItr.hasNext()){                SelectionKey key = selKeyItr.next();                selKeyItr.remove();                if(key.isAcceptable()){                    System.out.println("server acceptable");                    SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();                    channel.configureBlocking(false);                    channel.register(sel, SelectionKey.OP_READ);                }else if(key.isReadable()){                    System.out.println("server readable"+Thread.currentThread().getName());                    SocketChannel channel = (SocketChannel) key.channel();                    if(channel.read(buffer)>0){                        buffer.flip();                        CharBuffer clientBuffer = decoder.decode(buffer);                        System.out.println("from client::"+clientBuffer.toString());                        buffer.clear();                    }                    channel.register(sel, SelectionKey.OP_WRITE).attach(new ChannelResolver("D:/doc_backup.rar"));                }else if(key.isWritable()){                    SocketChannel channel =(SocketChannel) key.channel();                    if(key.attachment()!=null){                        ChannelResolver resolver = (ChannelResolver)key.attachment();                        buffer = resolver.readInto();                        if(buffer!=null){                            channel.write(buffer);                        }                    }                }            }        }    } catch (IOException e) {        e.printStackTrace();    }}

 

在handler的readInto()中已经进行了flip(),在这里就不用再flip()了。
相应地,client的读取也要改一下:

public void run() {    try {        System.out.println("client...");        SocketAddress addr = new InetSocketAddress(8989);        SocketChannel client = SocketChannel.open();        client.configureBlocking(false);        Selector sel = Selector.open();        client.register(sel, SelectionKey.OP_CONNECT);        CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();        ByteBuffer buffer= ByteBuffer.allocate(1024*500);        client.connect(addr);        while (true) {            sel.select();            Iterator<SelectionKey> selKeyItr = sel.selectedKeys().iterator();            while (selKeyItr.hasNext()) {                SelectionKey key = selKeyItr.next();                selKeyItr.remove();                if (key.isConnectable()) {                    System.out.println("client connectble");                    SocketChannel channel = (SocketChannel) key.channel();                    channel.configureBlocking(false);                    channel.finishConnect();                    channel.write(encoder.encode(CharBuffer.wrap("start download")));                    channel.register(sel, SelectionKey.OP_READ);                } else if (key.isReadable()) {                    SocketChannel channel = (SocketChannel) key.channel();                    if(channel.read(buffer)>0){                        buffer.flip();                        fc.write(buffer);                        buffer.clear();                    }else{                        channel.close();                    }                }            }        }    } catch (IOException e) {        e.printStackTrace();    }}

 

 

下面引用书本上的一段话:
[是基于事件驱动思想的,实现上通常采用Reactor模式,从程序角度而言,当发起IO的读写操作时,是非阻塞的;当socket有流可读或可写入socket时,操作系统会相应地通知应用程序进行处理,应用再将流读取到缓冲区或写入操作系统。对于网络IO而言,主要有连接建立、流读取和流写入三种事件。
AIO同样基于事件驱动思想,实现上通常采用Proactor模式。从程序角度而言,和NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。
较之NIO而言,AIO一方面简化了程序的编写,流的读取和写入都由操作系统来代替完成;另一方面省去了NIO中程序要遍历事件通知队列(selector)的代价。windows基于iocp、Linux基于epoll。]

Java - 网络I/O的阻塞