首页 > 代码库 > JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信

JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信

阻塞IO与非阻塞IO

通常情况下的Socket都是阻塞式的, 程序的输入输出都会让当前线程进入阻塞状态, 因此服务器需要为每一个客户端都创建一个线程。

从JAVA1.4开始引入了NIO API, NIO可以实现非阻塞IO, 这样就可以使用一个线程处理所有的客户请求。

基于NIO的非阻塞Socket通信

服务器将用来监听客户端请求的channel注册到selector上,启动一个线程,使用selector的select()获取求情的客户端的channel数量,

当监听到有客户端请求时,就通过SelectionKey返回对应的客户端channel进行通信。

下面是一个非常简单的例子,

服务器端

 1 package niochat;
 2 
 3 import java.io.IOException;
 4 import java.net.InetSocketAddress;
 5 import java.nio.ByteBuffer;
 6 import java.nio.channels.Channel;
 7 import java.nio.channels.SelectionKey;
 8 import java.nio.channels.Selector;
 9 import java.nio.channels.ServerSocketChannel;
10 import java.nio.channels.SocketChannel;
11 import java.nio.charset.Charset;
12 
13 public class NServer {
14     //用于检查所有channel状态的selector
15     private Selector selector = null;
16     static final int PORT = 3001;
17     private Charset charset = Charset.forName("utf-8");
18     public void init() throws IOException, InterruptedException {
19         selector = Selector.open();
20         //通过open方法打开一个未绑定的ServerSocketChannel实例
21         ServerSocketChannel server = ServerSocketChannel.open();
22         InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
23         server.bind(isa);
24         //设置非阻塞
25         server.configureBlocking(false);
26         //注册ServerSocketChannel到selector
27         server.register(selector, SelectionKey.OP_ACCEPT);
28         //select()返回一个key集合的元素个数,集合元素对应的channel正请求IO操作
29         int num = 0;
30         while ((num = selector.select()) > 0) {
31 
32             //每次都返回1
33             System.out.println("当前selector.select()="+num);
34             Thread.sleep(1000);
35             //被选中的key集合selectedKeys表示需要进行IO处理的channel集合
36             for (SelectionKey sk : selector.selectedKeys()) {
37                 //从selector上的已选择的key集合中删除正在处理的key
38                 selector.selectedKeys().remove(sk);
39                 //如果sk对应channel包含客户端连接请求
40                 if (sk.isAcceptable()) {
41                     //调用accept方法接受请求,产生一个服务器端的SocketChannel
42                     //在非阻塞模式下,如果没有连接则直接返回null
43                     SocketChannel sc = server.accept();
44                     //设置非阻塞模式
45                     sc.configureBlocking(false);
46                     //将SocketChannel也注册到selector
47                     sc.register(selector, SelectionKey.OP_READ);
48                     //再将sk对应的channel设置为请求准备接受其他请求
49                     sk.interestOps(SelectionKey.OP_ACCEPT);
50                 }
51 
52                 //如果sk对应的channel有数据需要读取
53                 if (sk.isReadable()) {
54                     //获取sk对应的channel
55                     SocketChannel sc = (SocketChannel)sk.channel();
56                     //channel中的数据必须先写入buffer中,然后才能写入进content中
57                     ByteBuffer buff = ByteBuffer.allocate(1024);
58                     String content = "";
59                     try {
60                         while(sc.read(buff) > 0) {
61                             //buffer 复位
62                             buff.flip();
63                             content += charset.decode(buff);
64                         }
65                         System.out.println("读取的数据:"+ content);
66                         //sk复位
67                         sk.interestOps(SelectionKey.OP_READ);
68                     } //遇到channel有异常说明客户端有异常,取消注册此sk
69                     catch (IOException ex) {
70                         //从已选择key集合中取消sk,下一次select()时此channel将自动被删除 
71                         ex.printStackTrace();
72                         
73                         sk.cancel();
74                         if(sk.channel() != null) {
75                             sk.channel().close();
76                         }
77                         
78                     }
79                     
80                     if (content.length() > 0) {
81                         //广播
82                         for(SelectionKey key : selector.keys()) {
83                             Channel targetChannel = key.channel();
84                             if (targetChannel instanceof SocketChannel) {
85                                 SocketChannel dest = (SocketChannel)targetChannel;
86                                 dest.write(charset.encode(content));
87                             }
88                         }
89                     }
90                 } 
91             
92             }
93         }
94     }
95     
96     public static void main(String[] args) throws IOException, InterruptedException {
97         new NServer().init();
98     }
99 }

客户端

 1 package niochat;
 2 
 3 import java.io.IOException;
 4 import java.net.InetSocketAddress;
 5 import java.nio.ByteBuffer;
 6 import java.nio.channels.Channel;
 7 import java.nio.channels.SelectionKey;
 8 import java.nio.channels.Selector;
 9 import java.nio.channels.SocketChannel;
10 import java.nio.charset.Charset;
11 import java.util.Scanner;
12 
13 public class NClient {
14     //定义检测SocketChannel的selector
15     private Selector selector = null;
16     private static final int PORT = 3001;
17     private Charset charset = Charset.forName("utf-8");
18     //客户端的SocketChannel
19     private SocketChannel sc = null;
20     public void init() throws IOException {
21          selector = Selector.open();
22          InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
23          //调用open静态方法创建连接到指定主机的SocketChannel
24          sc = SocketChannel.open(isa);
25          sc.configureBlocking(false);
26          sc.register(selector, SelectionKey.OP_READ);
27          //创建子线程读取服务器返回的数据
28          new ClientThread().start();
29          //键盘输入流
30          Scanner scan = new Scanner(System.in);
31          while (scan.hasNextLine())    {
32             String line = scan.nextLine();
33             //放进SocketChannel
34             sc.write(charset.encode(line));
35          }
36     }
37     
38     private class ClientThread extends Thread {
39         public void run() {
40             try {
41                 while (selector.select() > 0) {
42                     //被选中的key集合selectedKeys表示需要进行IO处理的channel集合
43                     for (SelectionKey sk : selector.selectedKeys()) {
44                         //删除正在处理的key
45                         selector.selectedKeys().remove(sk);
46                         if (sk.isReadable()) {    
47                             SocketChannel sc = (SocketChannel)sk.channel();
48                             ByteBuffer buff = ByteBuffer.allocate(1024);
49                             String content = "";
50                             while (sc.read(buff) > 0) {
51                                 sc.read(buff);
52                                 buff.flip();
53                                 content += charset.decode(buff);
54                             }
55                             System.out.println("聊天信息:"+content);
56                             sk.interestOps(SelectionKey.OP_READ);
57                         }
58                     }
59                 }
60             } catch (IOException e) {
61                 e.printStackTrace();
62             }
63         }
64     }
65     
66     public static void main(String[] args) throws IOException {
67         new NClient().init();
68     }
69 }

执行结果,启动了一个服务器端,然后启动了两个客户端,

技术分享

 

JAVA基础知识之网络编程——-基于NIO的非阻塞Socket通信