首页 > 代码库 > 说说nio

说说nio

既然说到了nio,就得谈以下几个问题
为什么会出现新io,"旧io"有什么问题吗?
ok,一步一步来,先给大家看几个例子:

1单线程的服务器程序

import java.net.*;
import java.io.*;

public class SocketServiceTest
{
    public static void main(String[] args) throws Exception
    {
        ServerSocket serverSocket = new ServerSocket(10002);
        Socket socket = null;
        try
        {
            while (true)
            {
                socket = serverSocket.accept();
                System.out.println("socket连接:" + socket.getRemoteSocketAddress().toString());
                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                while(true)
                {
                    String readLine = in.readLine();
                    System.out.println("收到消息" + readLine);
                    if("end".equals(readLine))
                        break;
                }
                
            }
        }
        catch (SocketException se)
        {
            System.out.println("客户端断开连接");
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        finally
        {
            System.out.println("socket关闭:" + socket.getRemoteSocketAddress().toString());
            socket.close();
        }
    }
}


我们运行Socket Test这个软件来测试一下
结果如下:

(为什么jfsdkof 没有显示出来? 自己想)

(有个小问题,程序没办法退出,有几种方法 可以在break那里主动抛出一个异常,或者把break改成return,又有个小问题先执行return还是finally?这个大家自己百度,我就不多说了)
除了上面的问题,还有一个,如果我同时开两个test软件,结果会是这样:在我第一个程序敲打end之前,第二个程序发的所有消息都没有反应,但第一个程序一旦发送end,第二个程序之前发的命令会都显示出来(break与return在这样是不一样的,哪里有区别,大家自己试)
换句话说,上面的代码,只能同时运行一个客户端!

2多线程的服务程序

import java.net.*;
import java.io.*;
import java.util.Scanner;

 public class MultithreadJIoSocketTest
{
    public static void main (String[] args) throws Exception
    {
        ServerSocket serverSocket = new ServerSocket(10002);
        Thread thread = new Thread(new Accptor(serverSocket));
        thread.start();
    }
}

 import java.io.*;
import java.net.*;

 public class Accptor implements Runnable
    {
        private ServerSocket serverSocket;
        
        public Accptor(ServerSocket serverSocket)
        {
            this.serverSocket = serverSocket;
        }

        public void run()
        {
            while (true)
            {
                Socket socket = null;
                try
                {
                    socket = serverSocket.accept();
                    if(socket != null)
                    {
                        System.out.println("收到了socket:" + socket.getRemoteSocketAddress().toString());
                        Thread thread = new Thread(new Processor(socket));
                        thread.start();
                    }
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }


import java.io.*;
import java.net.*;

 public class Processor implements Runnable
    {
        private Socket socket;
        
        public Processor(Socket socket)
        {
            this.socket = socket;
        }
        
        
        public void run()
        {
            try
            {
                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String readLine;
                while(true)
                {
                    readLine = in.readLine();
                    System.out.println("收到消息" + readLine);
                    if("end".equals(readLine))
                    {
                        break;
                    }
                    //客户端断开连接
                    Thread.sleep(5000);
                }
            }
            catch (InterruptedException e)
            {
               e.printStackTrace();
            }
            catch (SocketException se)
            {
                System.out.println("客户端断开连接");
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
            finally {
                try
                {
                    socket.close();
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }
        
    }


这里启用了线程的概念,都很简单,如果大家哪里还是不清楚,可以看看<<谈谈java中的线程(初级概念) >>就在我写的博客里面,很简单的一些基础知识#
显示如下

这里似乎解决了不能同时访问的问题,但是技术的进步总是这样,你解决了一个问题,马上就会出现新的三个问题
新出现的问题有
1 现在的http请求支持长连接,如果同时又10000个人在线,服务端就启动10000个线程吗? 如果是10万人呢?
2 就算不考虑第一个问题,如果多个线程涉及到对同一个文件的读写,怎么保证一致性?
3 如果我想提升某些用户的优先级,怎么办?
另外还有一个问题,在上面的例子中并没有体现出来,就是效率!我们期待一种新的io方式,来提升速度#
至少上面的三个问题都说明,我们需要一种新的io方式!

nio

nio类图如下



这里面多了几个类,Channel,Selector,Buffer;
我们可以这样理解,Channel就是在装载乘客的交通工具(它的具体形式,FileChannel,ServerSocketChannel就是公共汽车或者火车或者飞机)
Selector就是交通管理系统,它负责管理车辆的当前运行状态,是已经出站,还是在路上等等#
Buffer可以理解为交通工具上的座位#

这里对他们最简单的使用,我举think in java上的一个例子
package io;
//: io/GetChannel.java
// Getting channels from streams
import java.nio.*;
import java.nio.channels.*;
import java.io.*;

public class GetChannel {
  private static final int BSIZE = 1024;
  public static void main(String[] args) throws Exception {
    // Write a file:
    FileChannel fc =
      new FileOutputStream("data.txt").getChannel();
    fc.write(ByteBuffer.wrap("Some text ".getBytes()));
    fc.close();
    // Add to the end of the file:
    fc =
      new RandomAccessFile("data.txt", "rw").getChannel();
    fc.position(fc.size()); // Move to the end
    fc.write(ByteBuffer.wrap("Some more".getBytes()));
    fc.close();
    // Read the file:
    fc = new FileInputStream("data.txt").getChannel();
    ByteBuffer buff = ByteBuffer.allocate(BSIZE);
    fc.read(buff);
    buff.flip();
    while(buff.hasRemaining())
      System.out.print((char)buff.get());
  }
}
/* Output:
Some text Some more
*///:~


另外我想声明一下,在eclipse里面, FileChannel fc =new FileOutputStream("data.txt").getChannel()这个data.txt就应该在项目的根路径下,不再src或bin里面!!
关于nio转换数据,获取基本类型,试图缓冲器,内存映射文件的概念大家可以查阅think in java,这里只说最简单的概念#
在上文我们还提到了关于多个线程,加载同一个问题的问题#现在我们说说nio里面锁的概念
不多说了,先看一段代码
package io;

import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.concurrent.*;
import java.io.*;

public class FileLocking {
  public static void main(String[] args) throws Exception {
    FileOutputStream fos= new FileOutputStream("file.txt");
    FileLock fl = fos.getChannel().tryLock();
    if(fl != null) {
      System.out.println("Locked File");
      FileChannel fc =fos.getChannel();
      fc.write(ByteBuffer.wrap("Some textssss".getBytes()));
      
      TimeUnit.MILLISECONDS.sleep(1000);
      fl.release();
      System.out.println("Released Lock");
    }
    fos.close();
  }
} /* Output:
Locked File
Released Lock
*///:~

这里我想说说关于锁的获取,有两种方式,
调用FileChannel的lock方法或tryLock方法;
tryLock是非阻塞的,如果对于的文件已经被加锁,他就直接返回
lock是阻塞式的,它会一直阻塞直到锁可以获得;
另外锁包含独占锁与共享锁,具体信息可查看其它资料#
现在看一个复杂些的例子
package io;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;


/**
 * 测试NIO中的文件锁:三个线程争抢文件锁,获得锁后向文件中写数据,然后再释放文件锁。
 *
 * @author aofeng <a href="mailto:aofengblog@163.com>aofengblog@163.com</a>
 */
public class LockTest implements Runnable {
    public void run() {
        Thread curr = Thread.currentThread();
        System.out.println("Current executing thread is " + curr.getName());

        URL url = LockTest.class.getClassLoader().getResource("file.txt");
        //路径问题
        //http://www.cnblogs.com/rongxh7/archive/2010/04/22/1718178.html
        //
        RandomAccessFile raf = null;
        FileChannel fc = null;
        FileLock lock = null;
        try {
                        //就是这里的路径问题, 为什么要替换%20 去上面的资料里看
            raf = new RandomAccessFile(url.getPath().replaceAll("%20"," "), "rw");
            fc = raf.getChannel();
            System.out.println(curr.getName() + " ready");

            // 轮流获得文件独占锁。
            while (true) {
                try {
                    lock = fc.lock();
                    break;
                } catch (OverlappingFileLockException e) {
                    Thread.sleep(1 * 1000);
                }
            }

            if (null != lock) {
                System.out.println(curr.getName() + " get filelock success");
                fc.position(fc.size());
                fc.write(ByteBuffer.wrap((curr.getName() + " write data. ")
                        .getBytes()));
            } else {
                System.out.println(curr.getName() + " get filelock fail");
            }
            fc.close();
            raf.close();
            } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 注意:要先释放锁,再关闭通道。
            if (null != lock && lock.isValid()) {
                try {
                    lock.release();
                    System.out.println(curr.getName() + " release filelock");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

    
            
            
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        Thread t1 = new Thread(new LockTest());
        t1.setName("t1");
        Thread t2 = new Thread(new LockTest());
        t2.setName("t2");
        Thread t3 = new Thread(new LockTest());
        t3.setName("t3");

        t1.start();
        t2.start();
        t3.start();
    }
}


结果如下
Current executing thread is t1
Current executing thread is t2
Current executing thread is t3
t3 ready
t3 get filelock success
t1 ready
t1 get filelock success
t2 ready
t2 get filelock success
注意这是三个线程争用对文档的读写权利,因此读写的顺序,每次运行的结果不一定一样#

ok现在我们看看在网络中,nio是在怎么运作的



 非阻塞 IO 的支持可以算是 NIO API 中最重要的功能,非阻塞 IO 允许应用程序同时监控多个 channel 以提高性能,这一功能是通过 Selector , SelectableChannel 和SelectionKey 这 3 个类来实现的。
SelectableChannel 代表了可以支持非阻塞 IO 操作的 channel ,可以将其注册在 Selector 上,这种注册的关系由 SelectionKey 这个类来表现(见 UML 图)。 在Selector中可通过selectedKeys方法获得key集合
Selector 这个类通过 select() 函数,给应用程序提供了一个可以同时监控多个 IO channel 的方法:

 

应用程序通过调用 select() 函数,让 Selector 监控注册在其上的多个 SelectableChannel ,当有 channel 的 IO 操作可以进行时, select() 方法就会返回以让应用程序检查 channel 的状态,并作相应的处理。
public static void acceptConnections( int port) throws Exception {

      // 打开一个 Selector
      Selector acceptSelector = SelectorProvider.provider().openSelector();

      // 创建一个 ServerSocketChannel ,这是一个 SelectableChannel 的子类
      ServerSocketChannel ssc = ServerSocketChannel.open();

      // 将其设为 non-blocking 状态,这样才能进行非阻塞 IO 操作
      ssc.configureBlocking( false );

      // 给 ServerSocketChannel 对应的 socket 绑定 IP 和端口
      InetAddress lh = InetAddress.getLocalHost();
      InetSocketAddress isa = new InetSocketAddress(lh, port);
      ssc.socket().bind(isa);

      // 将 ServerSocketChannel 注册到 Selector 上,返回对应的 SelectionKey
      ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);

      int keysAdded = 0;
      // 用 select() 函数来监控注册在 Selector 上的 SelectableChannel
      // 返回值代表了有多少 channel 可以进行 IO 操作 (ready for IO)

      while ((keysAdded = acceptSelector.select()) > 0) {
          // selectedKeys() 返回一个 SelectionKey 的集合,
          // 其中每个 SelectionKey 代表了一个可以进行 IO 操作的 channel 。
          // 一个 ServerSocketChannel 可以进行 IO 操作意味着有新的 TCP 连接连入了
          Set<SelectionKey> readyKeys = acceptSelector.selectedKeys();
          Iterator<SelectionKey> i = readyKeys.iterator();

          while (i.hasNext()) {
             SelectionKey sk = (SelectionKey) i.next();
             // 需要将处理过的 key 从 selectedKeys 这个集合中删除

             i.remove();
             // 从 SelectionKey 得到对应的 channel
             ServerSocketChannel nextReady =(ServerSocketChannel) sk.channel();
             // 接受新的 TCP 连接

             Socket s = nextReady.accept().socket();
             // 把当前的时间写到这个新的 TCP 连接中

             PrintWriter out =new PrintWriter(s.getOutputStream(), true );

             Date now = new Date();
             out.println(now);
             // 关闭连接
             out.close();
          }

      }

   }



参考资料

http://www.cnblogs.com/zhuYears/archive/2012/09/28/2690194.html
http://aofengblog.blog.163.com/blog/static/631702120089276182626/
Think in java 第四版 第18章i/o系统
http://www.blogjava.net/19851985lili/articles/93524.html
深入分析java web内幕 许令波 第二章 深入分析java/io的工作机制

说说nio