首页 > 代码库 > MINA2 源码学习--源码结构梳理

MINA2 源码学习--源码结构梳理

一、mina的整体框架结构及案例:

1.整体结构图:

简述:以上是一张来自网上比较经典的图,整体上揭示了mina的结构,其中IoService包含客户端IoConnector和服务端IoAcceptor两部分。即无论是客户端还是服务端都是这个结构。IoService封装了网络传输层(TCP和UDP),而IoFilterChain中mina自带的filter做了一些基本的操作之外,支持扩展。经过FilterChain之后最终调用IoHandler,IoHandler是具体实现业务逻辑的处理接口,具体的业务实现可扩展。
2.一个可运行的案例(案例来自网上,转载后试验):
Client.java:

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Random;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class Client extends IoHandlerAdapter {

    private Random random = new Random(System.currentTimeMillis());
    public Client() {
        IoConnector connector = new NioSocketConnector();
        connector.getFilterChain().addLast(
                "text",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset
                        .forName(Server.ENCODE))));
        connector.setHandler(this);
        ConnectFuture future = connector.connect(new InetSocketAddress(
                "127.0.0.1", Server.PORT));
        future.awaitUninterruptibly();
        future.addListener(new IoFutureListener<ConnectFuture>() {
            @Override
            public void operationComplete(ConnectFuture future) {
                IoSession session = future.getSession();
                while (!session.isClosing()) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    String message = "你好,我roll了" + random.nextInt(100) + "点.";
                    session.write(message);
                }
            }
        });
        connector.dispose();
    }
    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        System.out.println("批复:" + message.toString());
    }
    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println("报告:" + message.toString());
    }
    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        session.close(true);
    }
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Client();
        }
    }
}

ServerHandler.java:

import java.net.InetSocketAddress;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class ServerHandler extends IoHandlerAdapter {
    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        session.close(false);
    }
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        String s = message.toString();
        System.out.println("收到请求:" + s);
        if (s != null) {
            int i = getPoint(s);
            if (session.isConnected()) {
                if (i >= 95) {
                    session.write("运气不错,你可以出去了.");
                    session.close(false);
                    return;
                }
                Integer count = (Integer) session.getAttribute(Server.KEY);
                count++;
                session.setAttribute(Server.KEY, count);
                session.write("抱歉,你运气太差了,第" + count + "次请求未被通过,继续在小黑屋呆着吧.");
            } else {
                session.close(true);
            }
        }
    }
    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println("发给客户端:" + message.toString());
    }
    @Override
    public void sessionClosed(IoSession session) throws Exception {
        long l = session.getCreationTime();
        System.out.println("来自" + getInfo(session) + "的会话已经关闭,它已经存活了"
                + (System.currentTimeMillis() - 1) + "毫秒");
    }
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("给" + getInfo(session) + "创建了一个会话");
    }
    @Override
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        System.out.println("来自" + getInfo(session) + "的会话闲置,状态为"
                + status.toString());
    }
    public void sessionOpened(IoSession session) throws Exception {
        session.setAttribute(Server.KEY, 0);
        System.out.println("和" + getInfo(session) + "的会话已经打开.");
    }
    public String getInfo(IoSession session) {

        if (session == null) {
            return null;
        }
        InetSocketAddress address = (InetSocketAddress) session
                .getRemoteAddress();
        int port = address.getPort();
        String ip = address.getAddress().getHostAddress();
        return ip + ":" + port;
    }
    public int getPoint(String s) {

        if (s == null) {
            return -1;
        }
        Pattern p = Pattern.compile("^[\u0041-\uFFFF,]*(\\d+).*$");
        Matcher m = p.matcher(s);
        if (m.matches()) {
            return Integer.valueOf(m.group(1));
        }
        return 0;
    }
}

Server.java:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;


public class Server {

    public static final int PORT = 2534;
    public static String ENCODE = "UTF-8";
    public static final String KEY = "roll";

    public static void main(String[] args){ 
        SocketAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast(
                "text",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset
                        .forName(ENCODE))));
        acceptor.setHandler(new ServerHandler());
        try {
            acceptor.bind(new InetSocketAddress(PORT));
            System.out.println("游戏开始,你想出去吗,来,碰碰运气吧!");
        } catch (IOException e) {
            e.printStackTrace();
            acceptor.dispose();
        }
    }
}

本案例依赖的jar如下图:


简述:以上是依赖mina实现的一个可运行的案例,就不多说了,结合整体的结构图和案例实现可以看出mina框架还是很轻量级的,下面分析一下mina的源码结构和一些时序流程。

二、mina 核心源码分析:

1.mina的启动时序(结合上面的案例):


简述:SocketAcceptor作为服务端对外启动接口类,在bind网络地址的时候,会触发服务端一系列服务的启动,从调用链可以清晰找到对应的源码阅读。其中AbstractPollingIoAcceptor是一个核心类,它会调用自身的startupAcceptor方法,来启动一个存放Acceptor的线程池用来处理客户端传输过来的请求。
AbstractPollingIoAcceptor 类的 startupAcceptor 方法如下:

/**
 * This method is called by the doBind() and doUnbind()
 * methods.  If the acceptor is null, the acceptor object will
 * be created and kicked off by the executor.  If the acceptor
 * object is null, probably already created and this class
 * is now working, then nothing will happen and the method
 * will just return.
 */
private void startupAcceptor() throws InterruptedException {
    // If the acceptor is not ready, clear the queues
    // TODO : they should already be clean : do we have to do that ?
    if (!selectable) {
        registerQueue.clear();
        cancelQueue.clear();
    }

    // start the acceptor if not already started
    Acceptor acceptor = acceptorRef.get();
    //这里只会启动一个worker
    if (acceptor == null) {
        lock.acquire();
        acceptor = new Acceptor();

        if (acceptorRef.compareAndSet(null, acceptor)) {
            executeWorker(acceptor);
        } else {
            lock.release();
        }
    }
}

上面调用到 AbstractIoService 的 executeWorker方法如下:

protected final void executeWorker(Runnable worker) {
    executeWorker(worker, null);
}

protected final void executeWorker(Runnable worker, String suffix) {
    String actualThreadName = threadName;
    if (suffix != null) {
        actualThreadName = actualThreadName + ‘-‘ + suffix;
    }
    executor.execute(new NamePreservingRunnable(worker, actualThreadName));
}

简述:有类AbstractPollingIoAcceptor 的 startupAcceptor方法(上文)可以看到,一个SocketAcceptor只启动了一个Worker线程(即代码中的Acceptor对象)并且把他加到线程池中。反过来讲,也可以看出AbstractIoService维护了Worker的线程池。(ps:这个Worker就是服务端处理请求的线程)。

2.Mina处理客户端链接的过程(启动后):

概述:从1中的启动时序可以看到,启动过程通过创建SocketAcceptor将有类AbstractPollingIoAcceptor的内部类Acceptor放到了 AbstractIoService的线程池里面,而这个Acceptor就是处理客户端网络请求的worker,而下面这个时序就是线程池中每个worker处理客户端网络请求的时序流程。

处理请求时序: 

简述:worker线程Acceptor的run方法中会调用NioSocketAcceptor或者AprSocketAccetpor的select方法。
ps:APR(Apache Protable Runtime Library,Apache可移植运行库)是可以提供很好的可拓展性、性能以及对底层操作系统一致性操作的技术,说白了就是apache实现的一套标准的通讯接口。

AprSocketAcceptor先不做深入了解,主要了解下NioSocketAcceptor,NioSocketAcceptor顾名思义,它调用了java NIO的API实现了NIO的网络连接处理过程。

AbstractPolling$Acceptor 的run方法的核心代码如下:

 private class Acceptor implements Runnable {
    public void run() {
        assert (acceptorRef.get() == this);

        int nHandles = 0;

        // Release the lock
        lock.release();

        while (selectable) {
            try {
                // Detect if we have some keys ready to be processed
                // The select() will be woke up if some new connection
                // have occurred, or if the selector has been explicitly
                // woke up
                //调用了NioSocketAcceptor的select方法,获取了selectKey
                int selected = select();

                // this actually sets the selector to OP_ACCEPT,
                // and binds to the port on which this class will
                // listen on
                nHandles += registerHandles();

                // Now, if the number of registred handles is 0, we can
                // quit the loop: we don‘t have any socket listening
                // for incoming connection.
                if (nHandles == 0) {
                    acceptorRef.set(null);

                    if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                        assert (acceptorRef.get() != this);
                        break;
                    }

                    if (!acceptorRef.compareAndSet(null, this)) {
                        assert (acceptorRef.get() != this);
                        break;
                    }

                    assert (acceptorRef.get() == this);
                }

                if (selected > 0) {
                    // We have some connection request, let‘s process
                    // them here.
                    processHandles(selectedHandles());
                }

                // check to see if any cancellation request has been made.
                nHandles -= unregisterHandles();
            } catch (ClosedSelectorException cse) {
                // If the selector has been closed, we can exit the loop
                break;
            } catch (Throwable e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    ExceptionMonitor.getInstance().exceptionCaught(e1);
                }
            }
        }

        // Cleanup all the processors, and shutdown the acceptor.
        if (selectable && isDisposing()) {
            selectable = false;
            try {
                if (createdProcessor) {
                    processor.dispose();
                }
            } finally {
                try {
                    synchronized (disposalLock) {
                        if (isDisposing()) {
                            destroy();
                        }
                    }
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                } finally {
                    disposalFuture.setDone();
                }
            }
        }
    }

简述:从上面的代码中可以看出一个典型的网络请求处理的程序,在循环中拿到处理的请求后就调用AbstractPollingIoAcceptor的processHandles()对网络请求做处理。代码如下:

    /**
     * This method will process new sessions for the Worker class.  All
     * keys that have had their status updates as per the Selector.selectedKeys()
     * method will be processed here.  Only keys that are ready to accept
     * connections are handled here.
     * <p/>
     * Session objects are created by making new instances of SocketSessionImpl
     * and passing the session object to the SocketIoProcessor class.
     */
    @SuppressWarnings("unchecked")
    private void processHandles(Iterator<H> handles) throws Exception {
        while (handles.hasNext()) {
            H handle = handles.next();
            handles.remove();

            // Associates a new created connection to a processor,
            // and get back a session
            //这里调用了NioSocketAcceptor的accept方法
            S session = accept(processor, handle);

            if (session == null) {
                continue;
            }

            initSession(session, null, null);

            // add the session to the SocketIoProcessor
            // 这步处理add操作,会触发对客户端请求的异步处理。
            session.getProcessor().add(session);
        }
    }

NioSocketAcceptor的accept方法new了一个包装Process处理线程的session实例:并且在调用session.getProcessor().add(session)的操作的时候触发了对客户端请求的异步处理。

/**
 * {@inheritDoc}
 */
@Override
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {

    SelectionKey key = handle.keyFor(selector);

    if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
        return null;
    }

    // accept the connection from the client
    SocketChannel ch = handle.accept();

    if (ch == null) {
        return null;
    }

    return new NioSocketSession(this, processor, ch);
}

再看上面时序图:有一步是AbstractPollingIoProcessor调用了startupProcessor方法,代码如下:

 /**
 * Starts the inner Processor, asking the executor to pick a thread in its
 * pool. The Runnable will be renamed
 */
private void startupProcessor() {
    Processor processor = processorRef.get();

    if (processor == null) {
        processor = new Processor();

        if (processorRef.compareAndSet(null, processor)) {
            executor.execute(new NamePreservingRunnable(processor, threadName));
        }
    }

    // Just stop the select() and start it again, so that the processor
    // can be activated immediately.
    wakeup();
}

简述:这个startupProcessor方法在调用 session里包装的processor的add方法是,触发了将处理客户端请求的processor放入异步处理的线程池中。后续具体Processor怎么处理客户端请求的流程,涉及到FilterChain的过滤,以及Adapter的调用,用来处理业务逻辑。具体的异步处理时序看下面的时序图:


简述:这个时序就是将待处理的客户端链接,通过NIO的形式接受请求,并将请求包装成Processor的形式放到处理的线程池中异步的处理。在异步的处理过程中则调用了Processor的run方法,具体的filterchain的调用和业务Adapter的调用也是在这一步得到处理。值得注意的是,Handler的调用是封装在DefaultFilterchain的内部类诶TairFilter中触发调用的,Processor的run方法代码如下:

 private class Processor implements Runnable {
    public void run() {
        assert (processorRef.get() == this);

        int nSessions = 0;
        lastIdleCheckTime = System.currentTimeMillis();

        for (;;) {
            try {
                // This select has a timeout so that we can manage
                // idle session when we get out of the select every
                // second. (note : this is a hack to avoid creating
                // a dedicated thread).
                long t0 = System.currentTimeMillis();
                //调用了NioProcessor
                int selected = select(SELECT_TIMEOUT);
                long t1 = System.currentTimeMillis();
                long delta = (t1 - t0);

                if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                    // Last chance : the select() may have been
                    // interrupted because we have had an closed channel.
                    if (isBrokenConnection()) {
                        LOG.warn("Broken connection");

                        // we can reselect immediately
                        // set back the flag to false
                        wakeupCalled.getAndSet(false);

                        continue;
                    } else {
                        LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                        // Ok, we are hit by the nasty(讨厌的) epoll
                        // spinning.
                        // Basically, there is a race condition
                        // which causes a closing file descriptor not to be
                        // considered as available as a selected channel, but
                        // it stopped the select. The next time we will
                        // call select(), it will exit immediately for the same
                        // reason, and do so forever, consuming 100%
                        // CPU.
                        // We have to destroy the selector, and
                        // register all the socket on a new one.
                        registerNewSelector();
                    }

                    // Set back the flag to false
                    wakeupCalled.getAndSet(false);

                    // and continue the loop
                    continue;
                }

                // Manage newly created session first
                nSessions += handleNewSessions();

                updateTrafficMask();

                // Now, if we have had some incoming or outgoing events,
                // deal with them
                if (selected > 0) {
                    //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                    //触发了具体的调用逻辑
                    process();
                }

                // Write the pending requests
                long currentTime = System.currentTimeMillis();
                flush(currentTime);

                // And manage removed sessions
                nSessions -= removeSessions();

                // Last, not least, send Idle events to the idle sessions
                notifyIdleSessions(currentTime);

                // Get a chance to exit the infinite loop if there are no
                // more sessions on this Processor
                if (nSessions == 0) {
                    processorRef.set(null);

                    if (newSessions.isEmpty() && isSelectorEmpty()) {
                        // newSessions.add() precedes startupProcessor
                        assert (processorRef.get() != this);
                        break;
                    }

                    assert (processorRef.get() != this);

                    if (!processorRef.compareAndSet(null, this)) {
                        // startupProcessor won race, so must exit processor
                        assert (processorRef.get() != this);
                        break;
                    }

                    assert (processorRef.get() == this);
                }

                // Disconnect all sessions immediately if disposal has been
                // requested so that we exit this loop eventually.
                if (isDisposing()) {
                    for (Iterator<S> i = allSessions(); i.hasNext();) {
                        scheduleRemove(i.next());
                    }

                    wakeup();
                }
            } catch (ClosedSelectorException cse) {
                // If the selector has been closed, we can exit the loop
                break;
            } catch (Throwable t) {
                ExceptionMonitor.getInstance().exceptionCaught(t);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    ExceptionMonitor.getInstance().exceptionCaught(e1);
                }
            }
        }

        try {
            synchronized (disposalLock) {
                if (disposing) {
                    doDispose();
                }
            }
        } catch (Throwable t) {
            ExceptionMonitor.getInstance().exceptionCaught(t);
        } finally {
            disposalFuture.setValue(true);
        }
    }
}

简述:这么一坨代码可以看出,这个处理器也调用了java的Nio API是一个NIO模型,其中select和process方法分别是从session拿到要处理的请求,并进行处理,而具体的Processor实例是NioProcessor。从添加注释的代码中有一步调用了自身的process方法,这步调用触发了具体业务逻辑的调用。可以结合代码和时序图看下。在Process方法中会调用reader(session)或wirte(session)方法,然后调用fireMessageReceived方法,这个方法又调用了callNextMessageReceived方法致使触发了整个FilterChain和Adapter的调用。read方法的核心代码如下:

private void read(S session) {
    IoSessionConfig config = session.getConfig();
    int bufferSize = config.getReadBufferSize();
    IoBuffer buf = IoBuffer.allocate(bufferSize);

    final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();

    try {
        int readBytes = 0;
        int ret;

        try {
            if (hasFragmentation) {

                while ((ret = read(session, buf)) > 0) {
                    readBytes += ret;

                    if (!buf.hasRemaining()) {
                        break;
                    }
                }
            } else {
                ret = read(session, buf);

                if (ret > 0) {
                    readBytes = ret;
                }
            }
        } finally {
            buf.flip();
        }

        if (readBytes > 0) {
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireMessageReceived(buf);
            buf = null;

            if (hasFragmentation) {
                if (readBytes << 1 < config.getReadBufferSize()) {
                    session.decreaseReadBufferSize();
                } else if (readBytes == config.getReadBufferSize()) {
                    session.increaseReadBufferSize();
                }
            }
        }

        if (ret < 0) {
            scheduleRemove(session);
        }
    } catch (Throwable e) {
        if (e instanceof IOException) {
            if (!(e instanceof PortUnreachableException)
                    || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
                    || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
                scheduleRemove(session);
            }
        }

        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
    }
}

从这段代码并结合上面的时序图可以看出来触发整个FilterChain的调用以及IoHandler的调用。

三、类结构分析

参考第一部分的整体结构图,画一下每个部分大致的类结构图:

简述: 从类继承结构图来看,可以看到在IOService体系下,存在IoConnector和IoAcceptor两个大的分支体系。IoConnector是做为客户端的时候使用,IoAcceptor是作为服务端的时候使用。实际上在Mina中,有三种worker线程分别是:Acceptor、Connector 和 I/O processor。
(1) Acceptor Thread 作为服务器端的链接线程,实现了IoService接口,线程的数量就是创建SocketAcceptor的数量。
(2) Connector Thread 作为客户端请求建立的链接线程,实现了IoService接口,维持了一个和服务端Acceptor的一个链接,线程的数量就是创建SocketConnector的数量。
(3) I/O processorThread 作为I/O真正处理的线程,存在于服务器端和客户端,线程的数量是可以配置的,默认是CPU个数+1。

上面那个图只是表述了IoService类体系,而I/O Processor的类体系并不在其中,见下图:


简述:IOProcessor主要分为两种,分别是AprIOProcessor和NioProcessor,Apr的解释见上文:ps:APR(Apache Protable Runtime Library,Apache可移植运行库)。NioProcessor也是Nio的一种实现,用来处理客户端连接过来的请求。在Processor中会调用到 FilterChain 和 Handler,见上文代码。先看下FilterChain的类结构图如下:


Filter 和 Handler的类结构如下:


Handler的类结构如下:


Mina的session类结构图如下:

Mina的Buffer的类结构图如下: