首页 > 代码库 > NIO和Reactor
NIO和Reactor
本文参考Doug Lea的Scalable IO in Java.
网络服务
随着网络服务的越来越多,我们对网络服务的性能有了更高的要求,提供一个高性能,稳定的web服务是一件很麻烦的事情,所以有了netty框架帮我们完成。
我们对各种各样的网络服务进行抽象,得到最基本的业务流程:
1:读取请求信息
2:对请求信息进行解码
3:进行相关的业务处理
4:对处理结果进行编码
5:发送请求
看到这,netty的ChannelHandler就是干这几件事情的。
传统的网络服务
在jdk1.4之前,我们只有BIO,所以当时的网络服务的架构是这样的。
每个线程处理一个请求, 由于线程个数和cpu个数的原因,这种设计性能是有上限的,所以就有了集群模式:tomcat集群。
/** * Created by gaoxing on 2015-01-20 . */public class ClasssicServer { public static void main(String[] args) { try { ServerSocket serverSocket=new ServerSocket(8888,10); System.out.println("server is start"); while( ! Thread.interrupted()) { new Thread(new Handler(serverSocket.accept())).start(); } } catch (IOException e) { e.printStackTrace(); } } static class Handler implements Runnable { final Socket socket; final static int MAX_SIZE=1024; public Handler(Socket socket) { this.socket=socket; } @Override public void run() { //TODO //在这里对socket中的数据进行读取和处理,然后把结果写入到socket中去 } }}
高性能的IO目标和Reactor
1:高负载下可以稳定的工作
2:提高资源的利用率
3:低延迟
这有就有了分而治之和事件驱动的思想。这样没有thread就不用瞎跑了,cpu就不用不停的切换Thread . 这样提出了Reactor模式
1:Reactor接收IO事件发送给该事件的处理器处理
2:处理器的操作是非阻塞的。
3:管理事件和处理器的绑定。
这是一个单线程版本,所有的请求都是一个线程处理,当然Reactor不是无缘无故的提出来的,因为jdk提供了nio包,nio使得Reator得于实现
1:channels: 通道就是数据源的抽象,通过它可以无阻塞的读取数据
2:buffers : 用来装载数据的,可以把从channel读取到buffer中,或者把buffer中的数据写入到channel中
3:selector : 用来监听 有IO事件,并告诉channel
4:selectionkeys: IO事件和处理器绑定
/** * Created by gaoxing on 2015-01-20 . */public class Reactor implements Runnable { final Selector selector ; final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { this.selector=Selector.open(); this.serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); //一定是非阻塞的,阻塞的一个通道就只能处理一个请求了 serverSocketChannel.configureBlocking(false); //把OP_ACCEPT事件和Acceptor绑定 SelectionKey sk=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } @Override public void run() { while(!Thread.interrupted()) { try { selector.select();//该方法是阻塞的,只有IO事件来了才向下执行 Set<SelectionKey> selected=selector.selectedKeys(); Iterator<SelectionKey> it=selected.iterator(); while(it.hasNext()) { dispatch(it.next()); }
selected.clear() } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey next) { Runnable runnable= (Runnable) next.attachment(); if(runnable!=null) { runnable.run(); } } private class Acceptor implements Runnable{ @Override public void run() { SocketChannel channel= null; try { channel = serverSocketChannel.accept(); if (channel!=null){ new Handler(selector,channel); } } catch (IOException e) { e.printStackTrace(); } } }}class Handler implements Runnable{ final SelectionKey sk; final SocketChannel channel; final static int MAXSIZE=1024; ByteBuffer input=ByteBuffer.allocate(MAXSIZE); ByteBuffer output=ByteBuffer.allocate(MAXSIZE); static final int READING=0,SENDING=1; int state=READING; public Handler(Selector selector,SocketChannel channel) throws IOException { this.channel=channel; this.channel.configureBlocking(false); /** * 这个有个问题,ppt上register是0, */ sk=this.channel.register(selector,SelectionKey.OP_READ); sk.attach(this); /** * 这里的作用是,设置key的状态为可读,然后让后selector的selector返回。 * 然后就可以处理OP_READ事件了 */ sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state==READING) read(); if (state==SENDING) write(); } void read(){ state=SENDING; sk.interestOps(SelectionKey.OP_WRITE); } void write() { sk.cancel(); }}
NIO和Reactor