首页 > 代码库 > java的nio之:java的nio的服务器实现模型
java的nio之:java的nio的服务器实现模型
【nio服务端序列图】
一:nio服务器启动类
1 package com.yeepay.sxf.testnio; 2 /** 3 * nio创建的的timerServer服务器 4 * 5 * @author sxf 6 * 7 */ 8 public class NIOTimerServer { 9 10 /**11 * nio服务器启动的入口12 * @param args13 */14 public static void main(String[] args) {15 //启动服务器绑定的端口号16 int port=8000;17 //获取端口号18 if(args!=null && args.length>0){19 try {20 port=Integer.valueOf(args[0]);21 } catch (Exception e) {22 e.printStackTrace();23 }24 }25 26 //新建nio服务器类27 MultiplexerTimerServer timerServer=new MultiplexerTimerServer(port);28 29 //启动服务类的主线程30 new Thread(timerServer,"NIO-MultiplexerTimerServer-001").start();31 }32 }
二:nio服务器
1 package com.yeepay.sxf.testnio; 2 3 import java.io.BufferedReader; 4 import java.io.IOException; 5 import java.net.InetSocketAddress; 6 import java.nio.ByteBuffer; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.ServerSocketChannel; 10 import java.nio.channels.SocketChannel; 11 import java.util.Date; 12 import java.util.Iterator; 13 import java.util.Set; 14 15 import com.sun.org.apache.xml.internal.utils.StopParseException; 16 17 /** 18 * nio的时间服务器 19 * @author sxf 20 * 21 */ 22 public class MultiplexerTimerServer implements Runnable { 23 24 //选择器 25 private Selector selector; 26 27 // 28 private ServerSocketChannel serverSocketChannel; 29 30 private volatile boolean stop; 31 32 //启动服务 33 public MultiplexerTimerServer(int port){ 34 try { 35 //初始化多路复用器 36 selector=Selector.open(); 37 //初始化socket通道 38 serverSocketChannel=ServerSocketChannel.open(); 39 //设置通道为非阻塞模式 40 serverSocketChannel.configureBlocking(false); 41 //将该通道绑定地址和端口号 42 serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); 43 //将该通道注册到多路复用器,并注册链接请求事件 44 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 45 System.out.println("The time server is start in port:"+port); 46 } catch (Exception e) { 47 // TODO: handle exception 48 e.printStackTrace(); 49 System.exit(1); 50 } 51 } 52 53 /** 54 * 停止服务器 55 */ 56 public void stop(){ 57 this.stop=true; 58 } 59 60 61 /** 62 * 服务器运行主体 63 */ 64 @Override 65 public void run() { 66 while(!stop){ 67 try { 68 System.out.println("MultiplexerTimerServer.run()"); 69 //select()阻塞到至少有一个通道在你注册的事件上就绪了。 70 selector.select(); 71 //获取注册在这个多路复用器上的已经就绪的通道的集合 72 Set<SelectionKey> selectionKeys=selector.selectedKeys(); 73 //循环迭代已经就绪的通道集合 74 Iterator<SelectionKey> it=selectionKeys.iterator(); 75 SelectionKey key=null; 76 while(it.hasNext()){ 77 key=it.next(); 78 //防止重复执行通道事件 79 it.remove(); 80 //处理该通道上的事件 81 try { 82 handleInput(key); 83 } catch (Exception e) { 84 if(key!=null){ 85 key.cancel(); 86 if(key.channel()!=null){ 87 key.channel().close(); 88 } 89 } 90 } 91 } 92 93 } catch (Exception e) { 94 e.printStackTrace(); 95 } 96 97 98 } 99 }100 101 102 /**103 * 处理请求的事件104 * @param key105 * @throws IOException106 */107 private void handleInput(SelectionKey key) throws IOException{108 if(key.isValid()){109 //处理新接入的请求消息110 if(key.isAcceptable()){111 //请求链接事件就绪112 ServerSocketChannel ssc=(ServerSocketChannel) key.channel();113 SocketChannel sc=ssc.accept();114 sc.configureBlocking(false);115 //在多路复用器上注册一个soketChannel,当有读事件则触发116 sc.register(selector, SelectionKey.OP_READ);117 }118 119 if(key.isReadable()){120 //读事件就绪121 SocketChannel sc=(SocketChannel) key.channel();122 //声明一个缓冲区123 ByteBuffer readBuffer=ByteBuffer.allocate(1024);124 //从通道里读取数据写入缓冲区125 int readBytes=sc.read(readBuffer);126 //readBytes>0:表示读到了字节,对字节进行编解码。127 //readBytes=0:没有读取到字节,属于正常场景,忽略128 //readBytes=-1;链路已经关闭,需要关闭socketChannel,释放资源129 if(readBytes>0){130 //将ByteBuffer的limit设置为position,position设置为0131 readBuffer.flip();132 //编解码数据133 byte[] bytes=new byte[readBuffer.remaining()];134 //将数据从缓冲区复制到数组里135 readBuffer.get(bytes);136 //翻译请求的内容137 String body=new String(bytes,"UTF-8");138 //打印请求的内容139 System.out.println("the timerserver receive order:"+body);140 141 //处理请求内容142 String currentTime=null;143 if("shangxiaofei".equals(body)){144 currentTime=new Date().toString();145 }else{146 currentTime="request param is error";147 }148 149 //将处理的结果响应给客户端150 doWrite(sc, currentTime);151 }else if(readBytes<0){152 //对链路进行关闭153 key.cancel();154 sc.close();155 }else{156 //忽略157 }158 }159 }160 }161 162 /**163 * 响应请求的内容164 * @param channel165 * @param response166 * @throws IOException 167 */168 private void doWrite(SocketChannel channel,String response) throws IOException{169 if(response!=null&&response.trim().length()>0){170 //将响应的内容转化成byte[]171 byte[] bytes=response.getBytes();172 //声明缓冲区173 ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);174 //将数据写入缓冲区175 writeBuffer.put(bytes);176 //修改ByteBuffer的imit设置为position,position设置为0177 writeBuffer.flip();178 //将数据从缓冲区写入通道179 channel.write(writeBuffer);180 }181 }182 183 184 }
【nio客户端序列图】
三:nio服务器客户端启动类
1 package com.yeepay.sxf.testnio; 2 3 4 /** 5 * 向TimerServer发送请求的客户端 6 * @author sxf 7 * 8 */ 9 public class NIOTimerClient {10 11 public static void main(String[] args) {12 int port=8000;13 14 if(args!=null&&args.length>0){15 port=Integer.valueOf(args[0]);16 }17 new Thread(new TimerClientHandler("127.0.0.1", port),"TimeClient-001").start();18 }19 }
四:nio服务器的客户端
1 package com.yeepay.sxf.testnio; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.nio.ByteBuffer; 6 import java.nio.channels.SelectionKey; 7 import java.nio.channels.Selector; 8 import java.nio.channels.SocketChannel; 9 import java.util.Iterator; 10 import java.util.Set; 11 12 /** 13 * timerclient请求线程 14 * @author sxf 15 * 16 */ 17 public class TimerClientHandler implements Runnable{ 18 //链接timer服务器的ip地址 19 private String host; 20 //链接timer服务器服务的端口号 21 private int port; 22 //多路复用器 23 private Selector selector; 24 //通道 25 private SocketChannel socketChannel; 26 //当前请求线程是否停止 27 private volatile boolean stop; 28 29 30 public TimerClientHandler(String host,int port) { 31 this.host=host==null?"127.0.0.1":host; 32 this.port=port; 33 try { 34 this.selector=Selector.open(); 35 this.socketChannel=SocketChannel.open(); 36 socketChannel.configureBlocking(false); 37 } catch (Exception e) { 38 e.printStackTrace(); 39 System.exit(1); 40 } 41 } 42 43 /** 44 * 链接时间服务器 45 * @throws IOException 46 */ 47 private void doConnect() throws IOException{ 48 if(socketChannel.connect(new InetSocketAddress(host, port))){ 49 socketChannel.register(selector, SelectionKey.OP_READ); 50 //doWrite(socketChannel); 51 }else{ 52 socketChannel.register(selector, SelectionKey.OP_CONNECT); 53 } 54 } 55 56 /** 57 * 向时间服务器发送请求 58 * @param sc 59 * @throws IOException 60 */ 61 private void doWrite(SocketChannel sc) throws IOException{ 62 //发送请求的请求内容 63 byte[] req="shangxiaofei".getBytes(); 64 //声明缓冲区 65 ByteBuffer writeBuffer=ByteBuffer.allocate(req.length); 66 //将请求体写入缓冲区 67 writeBuffer.put(req); 68 //设置limit 69 writeBuffer.flip(); 70 //将缓冲区的内容写入通道 71 sc.write(writeBuffer); 72 if(!writeBuffer.hasRemaining()){ 73 System.out.println("send order to server success........"); 74 } 75 76 } 77 78 79 private void handleInput(SelectionKey key) throws IOException{ 80 if(key.isValid()){ 81 //判断链接是否成功 82 SocketChannel sc=(SocketChannel) key.channel(); 83 84 //链接事件就绪 85 if(sc.finishConnect()){ 86 //是否链接完成 87 sc.register(selector, SelectionKey.OP_READ); 88 doWrite(sc); 89 }else{ 90 //链接失败,进程退出 91 System.exit(1); 92 } 93 94 if(key.isReadable()){ 95 //读事件就绪 96 ByteBuffer readBuffer=ByteBuffer.allocate(1024); 97 int readBytes=sc.read(readBuffer); 98 if(readBytes>0){ 99 readBuffer.flip();100 byte[] bytes=new byte[readBuffer.remaining()];101 readBuffer.get(bytes);102 String body=new String(bytes,"UTF-8");103 System.out.println("TimerServer response:"+body);104 this.stop=true;105 }else if(readBytes<0){106 //对端链路关闭107 key.cancel();108 sc.close();109 }else{110 //读到0字节,忽略111 }112 }113 114 }115 }116 117 @Override118 public void run() {119 try {120 //链接并发送请求121 doConnect();122 } catch (Exception e) {123 // TODO: handle exception124 e.printStackTrace();125 }126 127 while(!stop){128 try {129 //等待响应130 selector.select();131 //获取已经就绪的通道事件集合,在这个多路复用器上132 Set<SelectionKey> selectedKeys=selector.selectedKeys();133 //循环迭代处理事件集合134 Iterator<SelectionKey> it=selectedKeys.iterator();135 SelectionKey key=null;136 while (it.hasNext()) {137 key=it.next();138 it.remove();139 try {140 handleInput(key);141 } catch (Exception e) {142 e.printStackTrace();143 }144 145 }146 } catch (Exception e) {147 e.printStackTrace();148 }149 }150 151 //多路复用器关闭后,所有注册在上面的channel和Pipe等资源都会被自动去注册并关闭152 //所以不需要重复释放资源153 // if(selector!=null){154 // try {155 // selector.close();156 // } catch (Exception e) {157 // e.printStackTrace();158 // }159 // }160 161 }162 163 164 }
java的nio之:java的nio的服务器实现模型
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。