首页 > 代码库 > NIO框架之MINA源码解析(二):mina核心引擎
NIO框架之MINA源码解析(二):mina核心引擎
MINA的底层还是利用了jdk提供了nio功能,mina只是对nio进行封装,包括MINA用的线程池都是jdk直接提供的。
MINA的server端主要有accept、processor、session三部分组成的。其中accept主要负责在指定的端口监听,若有新连接则建立一个新的session;processor则负责处理session对应的发送数据和接收数据并调用上层处理;session则缓存当前连接数据。
MINA采用了线程懒启动的技术,即最少启动线程,在MINA server启动的时候,只有一个线程-accept,并且accept线程只有一个,在指定的端口进行监听(可以同时监听多个端口,mina可以绑定多端口)。
1、acceptor
先看下acceptor的主要类图吧。
mina server的启动入口是在NioSocketAcceptor.bind(InetSocketAddress)或者NioSocketAcceptor.bind(SocketAddress...)方法, acceptor.bind(new InetSocketAddress(1234));
然后会调用AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress>)方法,在bindInternal方法里面会调用startupAcceptor()方法提交一个accept线程到线程池里面(只提交一次),并初始化acceptor端的Selector,就这样一个acceptor线程启动了。
acceptor端业务相对简单,相当于在当前Selector里面监听acceptor事件,处理新连接并新建一个session放到对应的processor里面。
acceptor 代码,很简单。
- private class Acceptor implements Runnable {
- public void run() {
- assert (acceptorRef.get() == this);
- int nHandles = 0;
- // Release the lock
- lock.release();
- while (selectable) {
- try {
- // Detect if we have some keys ready to be processed
- // The select() will be woke up if some new connection
- // have occurred, or if the selector has been explicitly
- // woke up
- int selected = select();
- // this actually sets the selector to OP_ACCEPT,
- // and binds to the port on which this class will
- // listen on 在通道里面注册连接事件
- nHandles += registerHandles();
- // Now, if the number of registred handles is 0, we can
- // quit the loop: we don‘t have any socket listening
- // for incoming connection.
- if (nHandles == 0) {
- acceptorRef.set(null);
- if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
- assert (acceptorRef.get() != this);
- break;
- }
- if (!acceptorRef.compareAndSet(null, this)) {
- assert (acceptorRef.get() != this);
- break;
- }
- assert (acceptorRef.get() == this);
- }
- if (selected > 0) {
- // We have some connection request, let‘s process
- // them here.处理连接
- processHandles(selectedHandles());
- }
- // check to see if any cancellation request has been made.
- nHandles -= unregisterHandles();
- } catch (ClosedSelectorException cse) {
- // If the selector has been closed, we can exit the loop
- break;
- } catch (Throwable e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
- // Cleanup all the processors, and shutdown the acceptor.
- if (selectable && isDisposing()) {
- selectable = false;
- try {
- if (createdProcessor) {
- processor.dispose();
- }
- } finally {
- try {
- synchronized (disposalLock) {
- if (isDisposing()) {
- destroy();
- }
- }
- } catch (Exception e) {
- ExceptionMonitor.getInstance().exceptionCaught(e);
- } finally {
- disposalFuture.setDone();
- }
- }
- }
- }
2、processor
processor顾名思义,就是进行IO处理,处理当前session的数据读写,并进行业务处理。
在mina server初始化的时候,会初始化一个processor池,通过NioSocketAcceptor的构造器传入池的大小,默认是当前处理器的个数+1。
processor池里面有一个jdk提供的 线程池 - Executors.newCachedThreadPool()。各个processor线程会引用此线程池,即每个processor线程都在这个线程池里面运行。
在mina server实际处理时,每个processor相当于一个线程,轮流处理当前的session队列里面的数据(每个processor里面的session相当于顺序处理,共享一个线程)。
每个processor有一个Selector对象。
processor类图
processor端的处理逻辑相对有点复杂,看下面的流程图。
1、把新添加进来的session注册到当前processor的Selector里面的read事件,并初始化session;
2、判断当前Selector是否有读写事件;
3、若第2步有读事件时,进入步骤4,若没有的话,直接到第6步;
4、处理当前读事件,并把处理后的数据放入到flush队列里面;
5、把第4步执行的结果flush到客户端;
6、处理session,比如session idle时间等。
7、重新执行第1步,循环执行。
processor端代码。
- private class Processor implements Runnable {
- public void run() {
- assert (processorRef.get() == this);
- int nSessions = 0;
- lastIdleCheckTime = System.currentTimeMillis();
- for (;;) {
- try {
- // This select has a timeout so that we can manage
- // idle session when we get out of the select every
- // second. (note : this is a hack to avoid creating
- // a dedicated thread).
- long t0 = System.currentTimeMillis();
- int selected = select(SELECT_TIMEOUT);
- long t1 = System.currentTimeMillis();
- long delta = (t1 - t0);
- if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
- // Last chance : the select() may have been
- // interrupted because we have had an closed channel.
- if (isBrokenConnection()) {
- LOG.warn("Broken connection");
- // we can reselect immediately
- // set back the flag to false
- wakeupCalled.getAndSet(false);
- continue;
- } else {
- LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
- // Ok, we are hit by the nasty epoll
- // spinning.
- // Basically, there is a race condition
- // which causes a closing file descriptor not to be
- // considered as available as a selected channel, but
- // it stopped the select. The next time we will
- // call select(), it will exit immediately for the same
- // reason, and do so forever, consuming 100%
- // CPU.
- // We have to destroy the selector, and
- // register all the socket on a new one.
- registerNewSelector();
- }
- // Set back the flag to false
- wakeupCalled.getAndSet(false);
- // and continue the loop
- continue;
- }
- // Manage newly created session first 处理新添加进来的session,并注册到当前processor的Selector读事件
- nSessions += handleNewSessions();
- updateTrafficMask();
- // Now, if we have had some incoming or outgoing events,
- // deal with them
- if (selected > 0) {
- //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...处理读事件,并把结果放入flush队列里面
- process();
- }
- // Write the pending requests 把flush队列里面的session的处理完的数据发送给客户端
- long currentTime = System.currentTimeMillis();
- flush(currentTime);
- // And manage removed sessions
- nSessions -= removeSessions();
- // Last, not least, send Idle events to the idle sessions
- notifyIdleSessions(currentTime);
- // Get a chance to exit the infinite loop if there are no
- // more sessions on this Processor
- if (nSessions == 0) {
- processorRef.set(null);
- if (newSessions.isEmpty() && isSelectorEmpty()) {
- // newSessions.add() precedes startupProcessor
- assert (processorRef.get() != this);
- break;
- }
- assert (processorRef.get() != this);
- if (!processorRef.compareAndSet(null, this)) {
- // startupProcessor won race, so must exit processor
- assert (processorRef.get() != this);
- break;
- }
- assert (processorRef.get() == this);
- }
- // Disconnect all sessions immediately if disposal has been
- // requested so that we exit this loop eventually.
- if (isDisposing()) {
- for (Iterator<S> i = allSessions(); i.hasNext();) {
- scheduleRemove(i.next());
- }
- wakeup();
- }
- } catch (ClosedSelectorException cse) {
- // If the selector has been closed, we can exit the loop
- break;
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
- try {
- synchronized (disposalLock) {
- if (disposing) {
- doDispose();
- }
- }
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
- } finally {
- disposalFuture.setValue(true);
- }
- }
- }
3、session
session做为一个连接的具体对象,缓存当前连接用户的一些信息。
session类图
session对象是绑定在SelectableChannel的一个attach。
- class NioProcessor
- @Override
- protected void init(NioSession session) throws Exception {
- SelectableChannel ch = (SelectableChannel) session.getChannel();
- ch.configureBlocking(false);
- session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));//chanel 注册读事件,并把session当做一个attach辅导SelectableChannel里面。
- }
NIO框架之MINA源码解析(二):mina核心引擎