首页 > 代码库 > NIO框架之MINA源码解析(二):mina核心引擎

NIO框架之MINA源码解析(二):mina核心引擎

MINA的底层还是利用了jdk提供了nio功能,mina只是对nio进行封装,包括MINA用的线程池都是jdk直接提供的。


MINA的server端主要有accept、processor、session三部分组成的。其中accept主要负责在指定的端口监听,若有新连接则建立一个新的session;processor则负责处理session对应的发送数据和接收数据并调用上层处理;session则缓存当前连接数据

MINA采用了线程懒启动的技术,即最少启动线程,在MINA server启动的时候,只有一个线程-accept,并且accept线程只有一个,在指定的端口进行监听(可以同时监听多个端口,mina可以绑定多端口)。

 

1、acceptor

 

先看下acceptor的主要类图吧。

技术分享

 

 

mina server的启动入口是在NioSocketAcceptor.bind(InetSocketAddress)或者NioSocketAcceptor.bind(SocketAddress...)方法, acceptor.bind(new InetSocketAddress(1234));  
然后会调用AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress>)方法,在bindInternal方法里面会调用startupAcceptor()方法提交一个accept线程到线程池里面(只提交一次),并初始化acceptor端的Selector,就这样一个acceptor线程启动了。

acceptor端业务相对简单,相当于在当前Selector里面监听acceptor事件,处理新连接并新建一个session放到对应的processor里面。

 

技术分享

 

acceptor 代码,很简单。

 

[java] view plain copy
 
 print?技术分享技术分享
  1. private class Acceptor implements Runnable {  
  2.        public void run() {  
  3.            assert (acceptorRef.get() == this);  
  4.   
  5.            int nHandles = 0;  
  6.   
  7.            // Release the lock  
  8.            lock.release();  
  9.   
  10.            while (selectable) {  
  11.                try {  
  12.                    // Detect if we have some keys ready to be processed  
  13.                    // The select() will be woke up if some new connection  
  14.                    // have occurred, or if the selector has been explicitly  
  15.                    // woke up  
  16.                    int selected = select();  
  17.   
  18.                    // this actually sets the selector to OP_ACCEPT,  
  19.                    // and binds to the port on which this class will  
  20.                    // listen on  在通道里面注册连接事件  
  21.                    nHandles += registerHandles();  
  22.   
  23.                    // Now, if the number of registred handles is 0, we can  
  24.                    // quit the loop: we don‘t have any socket listening  
  25.                    // for incoming connection.  
  26.                    if (nHandles == 0) {  
  27.                        acceptorRef.set(null);  
  28.   
  29.                        if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {  
  30.                            assert (acceptorRef.get() != this);  
  31.                            break;  
  32.                        }  
  33.   
  34.                        if (!acceptorRef.compareAndSet(null, this)) {  
  35.                            assert (acceptorRef.get() != this);  
  36.                            break;  
  37.                        }  
  38.   
  39.                        assert (acceptorRef.get() == this);  
  40.                    }  
  41.   
  42.                    if (selected > 0) {  
  43.                        // We have some connection request, let‘s process  
  44.                        // them here.处理连接  
  45.                        processHandles(selectedHandles());  
  46.                    }  
  47.   
  48.                    // check to see if any cancellation request has been made.  
  49.                    nHandles -= unregisterHandles();  
  50.                } catch (ClosedSelectorException cse) {  
  51.                    // If the selector has been closed, we can exit the loop  
  52.                    break;  
  53.                } catch (Throwable e) {  
  54.                    ExceptionMonitor.getInstance().exceptionCaught(e);  
  55.   
  56.                    try {  
  57.                        Thread.sleep(1000);  
  58.                    } catch (InterruptedException e1) {  
  59.                        ExceptionMonitor.getInstance().exceptionCaught(e1);  
  60.                    }  
  61.                }  
  62.            }  
  63.   
  64.            // Cleanup all the processors, and shutdown the acceptor.  
  65.            if (selectable && isDisposing()) {  
  66.                selectable = false;  
  67.                try {  
  68.                    if (createdProcessor) {  
  69.                        processor.dispose();  
  70.                    }  
  71.                } finally {  
  72.                    try {  
  73.                        synchronized (disposalLock) {  
  74.                            if (isDisposing()) {  
  75.                                destroy();  
  76.                            }  
  77.                        }  
  78.                    } catch (Exception e) {  
  79.                        ExceptionMonitor.getInstance().exceptionCaught(e);  
  80.                    } finally {  
  81.                        disposalFuture.setDone();  
  82.                    }  
  83.                }  
  84.            }  
  85.        }  

 

 

2、processor

 

 

processor顾名思义,就是进行IO处理,处理当前session的数据读写,并进行业务处理。

 

在mina server初始化的时候,会初始化一个processor池,通过NioSocketAcceptor的构造器传入池的大小,默认是当前处理器的个数+1。

 

processor池里面有一个jdk提供的 线程池 - Executors.newCachedThreadPool()。各个processor线程会引用此线程池,即每个processor线程都在这个线程池里面运行。

 

在mina server实际处理时,每个processor相当于一个线程,轮流处理当前的session队列里面的数据(每个processor里面的session相当于顺序处理,共享一个线程)。

 

每个processor有一个Selector对象

 

processor类图

技术分享


processor端的处理逻辑相对有点复杂,看下面的流程图。

 

技术分享

 

1、把新添加进来的session注册到当前processor的Selector里面的read事件,并初始化session;
2、判断当前Selector是否有读写事件;
3、若第2步有读事件时,进入步骤4,若没有的话,直接到第6步;
4、处理当前读事件,并把处理后的数据放入到flush队列里面;
5、把第4步执行的结果flush到客户端;
6、处理session,比如session idle时间等。
7、重新执行第1步,循环执行。

 

processor端代码。

 

[java] view plain copy
 
 print?技术分享技术分享
  1. private class Processor implements Runnable {  
  2.        public void run() {  
  3.            assert (processorRef.get() == this);  
  4.   
  5.            int nSessions = 0;  
  6.            lastIdleCheckTime = System.currentTimeMillis();  
  7.   
  8.            for (;;) {  
  9.                try {  
  10.                    // This select has a timeout so that we can manage  
  11.                    // idle session when we get out of the select every  
  12.                    // second. (note : this is a hack to avoid creating  
  13.                    // a dedicated thread).  
  14.                    long t0 = System.currentTimeMillis();  
  15.                    int selected = select(SELECT_TIMEOUT);  
  16.                    long t1 = System.currentTimeMillis();  
  17.                    long delta = (t1 - t0);  
  18.   
  19.                    if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {  
  20.                        // Last chance : the select() may have been  
  21.                        // interrupted because we have had an closed channel.  
  22.                        if (isBrokenConnection()) {  
  23.                            LOG.warn("Broken connection");  
  24.   
  25.                            // we can reselect immediately  
  26.                            // set back the flag to false  
  27.                            wakeupCalled.getAndSet(false);  
  28.   
  29.                            continue;  
  30.                        } else {  
  31.                            LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));  
  32.                            // Ok, we are hit by the nasty epoll  
  33.                            // spinning.  
  34.                            // Basically, there is a race condition  
  35.                            // which causes a closing file descriptor not to be  
  36.                            // considered as available as a selected channel, but  
  37.                            // it stopped the select. The next time we will  
  38.                            // call select(), it will exit immediately for the same  
  39.                            // reason, and do so forever, consuming 100%  
  40.                            // CPU.  
  41.                            // We have to destroy the selector, and  
  42.                            // register all the socket on a new one.  
  43.                            registerNewSelector();  
  44.                        }  
  45.   
  46.                        // Set back the flag to false  
  47.                        wakeupCalled.getAndSet(false);  
  48.   
  49.                        // and continue the loop  
  50.                        continue;  
  51.                    }  
  52.   
  53.                    // Manage newly created session first  处理新添加进来的session,并注册到当前processor的Selector读事件  
  54.                    nSessions += handleNewSessions();  
  55.   
  56.                    updateTrafficMask();  
  57.   
  58.                    // Now, if we have had some incoming or outgoing events,  
  59.                    // deal with them  
  60.                    if (selected > 0) {  
  61.                        //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...处理读事件,并把结果放入flush队列里面  
  62.                        process();  
  63.                    }  
  64.   
  65.                    // Write the pending requests  把flush队列里面的session的处理完的数据发送给客户端  
  66.                    long currentTime = System.currentTimeMillis();  
  67.                    flush(currentTime);  
  68.   
  69.                    // And manage removed sessions  
  70.                    nSessions -= removeSessions();  
  71.   
  72.                    // Last, not least, send Idle events to the idle sessions  
  73.                    notifyIdleSessions(currentTime);  
  74.   
  75.                    // Get a chance to exit the infinite loop if there are no  
  76.                    // more sessions on this Processor  
  77.                    if (nSessions == 0) {  
  78.                        processorRef.set(null);  
  79.   
  80.                        if (newSessions.isEmpty() && isSelectorEmpty()) {  
  81.                            // newSessions.add() precedes startupProcessor  
  82.                            assert (processorRef.get() != this);  
  83.                            break;  
  84.                        }  
  85.   
  86.                        assert (processorRef.get() != this);  
  87.   
  88.                        if (!processorRef.compareAndSet(null, this)) {  
  89.                            // startupProcessor won race, so must exit processor  
  90.                            assert (processorRef.get() != this);  
  91.                            break;  
  92.                        }  
  93.   
  94.                        assert (processorRef.get() == this);  
  95.                    }  
  96.   
  97.                    // Disconnect all sessions immediately if disposal has been  
  98.                    // requested so that we exit this loop eventually.  
  99.                    if (isDisposing()) {  
  100.                        for (Iterator<S> i = allSessions(); i.hasNext();) {  
  101.                            scheduleRemove(i.next());  
  102.                        }  
  103.   
  104.                        wakeup();  
  105.                    }  
  106.                } catch (ClosedSelectorException cse) {  
  107.                    // If the selector has been closed, we can exit the loop  
  108.                    break;  
  109.                } catch (Throwable t) {  
  110.                    ExceptionMonitor.getInstance().exceptionCaught(t);  
  111.   
  112.                    try {  
  113.                        Thread.sleep(1000);  
  114.                    } catch (InterruptedException e1) {  
  115.                        ExceptionMonitor.getInstance().exceptionCaught(e1);  
  116.                    }  
  117.                }  
  118.            }  
  119.   
  120.            try {  
  121.                synchronized (disposalLock) {  
  122.                    if (disposing) {  
  123.                        doDispose();  
  124.                    }  
  125.                }  
  126.            } catch (Throwable t) {  
  127.                ExceptionMonitor.getInstance().exceptionCaught(t);  
  128.            } finally {  
  129.                disposalFuture.setValue(true);  
  130.            }  
  131.        }  
  132.    }  

 

 

3、session

 

session做为一个连接的具体对象,缓存当前连接用户的一些信息。

session类图

技术分享

 

session对象是绑定在SelectableChannel的一个attach。

 

 

[java] view plain copy
 
 print?技术分享技术分享
    1. class  NioProcessor  
    2.  @Override  
    3.     protected void init(NioSession session) throws Exception {  
    4.         SelectableChannel ch = (SelectableChannel) session.getChannel();  
    5.         ch.configureBlocking(false);  
    6.         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));//chanel 注册读事件,并把session当做一个attach辅导SelectableChannel里面。  
    7.     }  

NIO框架之MINA源码解析(二):mina核心引擎