首页 > 代码库 > java的nio之:java的nio的服务器实现模型

java的nio之:java的nio的服务器实现模型

【nio服务端序列图】

技术分享

一:nio服务器启动类

技术分享
 1 package com.yeepay.sxf.testnio; 2 /** 3  * nio创建的的timerServer服务器 4  *  5  * @author sxf 6  * 7  */ 8 public class NIOTimerServer { 9     10     /**11      * nio服务器启动的入口12      * @param args13      */14     public static void main(String[] args) {15         //启动服务器绑定的端口号16         int port=8000;17         //获取端口号18         if(args!=null && args.length>0){19             try {20                 port=Integer.valueOf(args[0]);21             } catch (Exception e) {22                 e.printStackTrace();23             }24         }25         26         //新建nio服务器类27         MultiplexerTimerServer timerServer=new MultiplexerTimerServer(port);28         29         //启动服务类的主线程30         new Thread(timerServer,"NIO-MultiplexerTimerServer-001").start();31     }32 }
View Code

二:nio服务器

技术分享
  1 package com.yeepay.sxf.testnio;  2   3 import java.io.BufferedReader;  4 import java.io.IOException;  5 import java.net.InetSocketAddress;  6 import java.nio.ByteBuffer;  7 import java.nio.channels.SelectionKey;  8 import java.nio.channels.Selector;  9 import java.nio.channels.ServerSocketChannel; 10 import java.nio.channels.SocketChannel; 11 import java.util.Date; 12 import java.util.Iterator; 13 import java.util.Set; 14  15 import com.sun.org.apache.xml.internal.utils.StopParseException; 16  17 /** 18  * nio的时间服务器 19  * @author sxf 20  * 21  */ 22 public class MultiplexerTimerServer implements Runnable { 23      24     //选择器 25     private Selector selector; 26  27     // 28     private ServerSocketChannel serverSocketChannel; 29      30     private volatile boolean stop; 31      32     //启动服务 33     public MultiplexerTimerServer(int port){ 34         try { 35             //初始化多路复用器 36             selector=Selector.open(); 37             //初始化socket通道 38             serverSocketChannel=ServerSocketChannel.open(); 39             //设置通道为非阻塞模式 40             serverSocketChannel.configureBlocking(false); 41             //将该通道绑定地址和端口号 42             serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); 43             //将该通道注册到多路复用器,并注册链接请求事件 44             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 45             System.out.println("The time server is start in port:"+port); 46         } catch (Exception e) { 47             // TODO: handle exception 48             e.printStackTrace(); 49             System.exit(1); 50         } 51     } 52      53     /** 54      * 停止服务器 55      */ 56     public void stop(){ 57         this.stop=true; 58     } 59      60      61     /** 62      * 服务器运行主体 63      */ 64     @Override 65     public void run() { 66         while(!stop){ 67             try { 68                 System.out.println("MultiplexerTimerServer.run()"); 69                 //select()阻塞到至少有一个通道在你注册的事件上就绪了。 70                 selector.select(); 71                 //获取注册在这个多路复用器上的已经就绪的通道的集合 72                 Set<SelectionKey> selectionKeys=selector.selectedKeys(); 73                 //循环迭代已经就绪的通道集合 74                 Iterator<SelectionKey> it=selectionKeys.iterator(); 75                 SelectionKey key=null; 76                 while(it.hasNext()){ 77                     key=it.next(); 78                     //防止重复执行通道事件 79                     it.remove(); 80                     //处理该通道上的事件 81                     try { 82                         handleInput(key); 83                     } catch (Exception e) { 84                         if(key!=null){ 85                             key.cancel(); 86                             if(key.channel()!=null){ 87                                 key.channel().close(); 88                             } 89                         } 90                     } 91                 } 92                  93             } catch (Exception e) { 94                 e.printStackTrace(); 95             } 96          97              98         } 99     }100 101     102     /**103      * 处理请求的事件104      * @param key105      * @throws IOException106      */107     private void handleInput(SelectionKey key) throws IOException{108         if(key.isValid()){109             //处理新接入的请求消息110             if(key.isAcceptable()){111                 //请求链接事件就绪112                 ServerSocketChannel ssc=(ServerSocketChannel) key.channel();113                 SocketChannel  sc=ssc.accept();114                 sc.configureBlocking(false);115                 //在多路复用器上注册一个soketChannel,当有读事件则触发116                 sc.register(selector, SelectionKey.OP_READ);117             }118             119             if(key.isReadable()){120                 //读事件就绪121                 SocketChannel sc=(SocketChannel) key.channel();122                 //声明一个缓冲区123                 ByteBuffer readBuffer=ByteBuffer.allocate(1024);124                 //从通道里读取数据写入缓冲区125                 int readBytes=sc.read(readBuffer);126                 //readBytes>0:表示读到了字节,对字节进行编解码。127                 //readBytes=0:没有读取到字节,属于正常场景,忽略128                 //readBytes=-1;链路已经关闭,需要关闭socketChannel,释放资源129                 if(readBytes>0){130                     //将ByteBuffer的limit设置为position,position设置为0131                     readBuffer.flip();132                     //编解码数据133                     byte[] bytes=new byte[readBuffer.remaining()];134                     //将数据从缓冲区复制到数组里135                     readBuffer.get(bytes);136                     //翻译请求的内容137                     String body=new String(bytes,"UTF-8");138                     //打印请求的内容139                     System.out.println("the timerserver receive order:"+body);140                 141                     //处理请求内容142                     String currentTime=null;143                     if("shangxiaofei".equals(body)){144                         currentTime=new Date().toString();145                     }else{146                         currentTime="request param is error";147                     }148                     149                     //将处理的结果响应给客户端150                     doWrite(sc, currentTime);151                 }else if(readBytes<0){152                     //对链路进行关闭153                     key.cancel();154                     sc.close();155                 }else{156                     //忽略157                 }158             }159         }160     }161     162     /**163      * 响应请求的内容164      * @param channel165      * @param response166      * @throws IOException 167      */168     private void doWrite(SocketChannel channel,String response) throws IOException{169         if(response!=null&&response.trim().length()>0){170             //将响应的内容转化成byte[]171             byte[] bytes=response.getBytes();172             //声明缓冲区173             ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);174             //将数据写入缓冲区175             writeBuffer.put(bytes);176             //修改ByteBuffer的imit设置为position,position设置为0177             writeBuffer.flip();178             //将数据从缓冲区写入通道179             channel.write(writeBuffer);180         }181     }182     183     184 }
View Code

【nio客户端序列图】

技术分享

 

三:nio服务器客户端启动类

技术分享
 1 package com.yeepay.sxf.testnio; 2  3  4 /** 5  * 向TimerServer发送请求的客户端 6  * @author sxf 7  * 8  */ 9 public class NIOTimerClient {10     11     public static void main(String[] args) {12         int port=8000;13         14         if(args!=null&&args.length>0){15             port=Integer.valueOf(args[0]);16         }17         new Thread(new TimerClientHandler("127.0.0.1", port),"TimeClient-001").start();18     }19 }
View Code

四:nio服务器的客户端

技术分享
  1 package com.yeepay.sxf.testnio;  2   3 import java.io.IOException;  4 import java.net.InetSocketAddress;  5 import java.nio.ByteBuffer;  6 import java.nio.channels.SelectionKey;  7 import java.nio.channels.Selector;  8 import java.nio.channels.SocketChannel;  9 import java.util.Iterator; 10 import java.util.Set; 11  12 /** 13  * timerclient请求线程 14  * @author sxf 15  * 16  */ 17 public class TimerClientHandler implements Runnable{ 18     //链接timer服务器的ip地址 19     private String host; 20     //链接timer服务器服务的端口号 21     private int port; 22     //多路复用器 23     private Selector selector; 24     //通道 25     private SocketChannel socketChannel; 26     //当前请求线程是否停止 27     private volatile boolean stop; 28      29      30     public TimerClientHandler(String host,int port) { 31         this.host=host==null?"127.0.0.1":host; 32         this.port=port; 33         try { 34             this.selector=Selector.open(); 35             this.socketChannel=SocketChannel.open(); 36             socketChannel.configureBlocking(false); 37         } catch (Exception e) { 38             e.printStackTrace(); 39             System.exit(1); 40         } 41     } 42      43     /** 44      * 链接时间服务器 45      * @throws IOException  46      */ 47     private void doConnect() throws IOException{ 48         if(socketChannel.connect(new InetSocketAddress(host, port))){ 49             socketChannel.register(selector, SelectionKey.OP_READ); 50             //doWrite(socketChannel); 51         }else{ 52             socketChannel.register(selector, SelectionKey.OP_CONNECT); 53         } 54     } 55      56     /** 57      * 向时间服务器发送请求 58      * @param sc 59      * @throws IOException  60      */ 61     private void doWrite(SocketChannel sc) throws IOException{ 62         //发送请求的请求内容 63         byte[] req="shangxiaofei".getBytes(); 64         //声明缓冲区 65         ByteBuffer writeBuffer=ByteBuffer.allocate(req.length); 66         //将请求体写入缓冲区 67         writeBuffer.put(req); 68         //设置limit 69         writeBuffer.flip(); 70         //将缓冲区的内容写入通道 71         sc.write(writeBuffer); 72         if(!writeBuffer.hasRemaining()){ 73             System.out.println("send order to server success........"); 74         } 75          76     } 77      78      79     private void handleInput(SelectionKey key) throws IOException{ 80         if(key.isValid()){ 81             //判断链接是否成功 82             SocketChannel sc=(SocketChannel) key.channel(); 83          84                 //链接事件就绪 85                 if(sc.finishConnect()){ 86                     //是否链接完成 87                     sc.register(selector, SelectionKey.OP_READ); 88                     doWrite(sc); 89                 }else{ 90                     //链接失败,进程退出 91                     System.exit(1); 92                 } 93                  94                 if(key.isReadable()){ 95                     //读事件就绪 96                     ByteBuffer readBuffer=ByteBuffer.allocate(1024); 97                     int readBytes=sc.read(readBuffer); 98                     if(readBytes>0){ 99                         readBuffer.flip();100                         byte[] bytes=new byte[readBuffer.remaining()];101                         readBuffer.get(bytes);102                         String body=new String(bytes,"UTF-8");103                         System.out.println("TimerServer response:"+body);104                         this.stop=true;105                     }else if(readBytes<0){106                         //对端链路关闭107                         key.cancel();108                         sc.close();109                     }else{110                         //读到0字节,忽略111                     }112                 }113             114         }115     }116     117     @Override118     public void run() {119         try {120             //链接并发送请求121             doConnect();122         } catch (Exception e) {123             // TODO: handle exception124             e.printStackTrace();125         }126         127         while(!stop){128             try {129                 //等待响应130                 selector.select();131                 //获取已经就绪的通道事件集合,在这个多路复用器上132                 Set<SelectionKey> selectedKeys=selector.selectedKeys();133                 //循环迭代处理事件集合134                 Iterator<SelectionKey> it=selectedKeys.iterator();135                 SelectionKey key=null;136                 while (it.hasNext()) {137                     key=it.next();138                     it.remove();139                     try {140                         handleInput(key);141                     } catch (Exception e) {142                         e.printStackTrace();143                     }144                     145                 }146             } catch (Exception e) {147                 e.printStackTrace();148             }149         }150         151         //多路复用器关闭后,所有注册在上面的channel和Pipe等资源都会被自动去注册并关闭152         //所以不需要重复释放资源153 //        if(selector!=null){154 //            try {155 //                selector.close();156 //            } catch (Exception e) {157 //                e.printStackTrace();158 //            }159 //        }160         161     }162     163 164 }
View Code

 

java的nio之:java的nio的服务器实现模型