首页 > 代码库 > Mina2.0x框架+源码分析

Mina2.0x框架+源码分析

然后来说说IoHandle接口,Mina中的所有I/O事件都是通过这个接口来处理的,这些事件都是上面所说的I/O Processor发出来的,要注意的一点是同一个I/O Processor线程是负责处理多个会话的。包括下面这几个事件的处理:
技术分享
public interface IoHandler
{
    void sessionCreated(IoSession session) throws Exception;//会话创建
    void sessionOpened(IoSession session) throws Exception;//打开会话,与sessionCreated最大的区别是它是从另一个线程处调用的
    void sessionClosed(IoSession session) throws Exception;//会话结束,当连接关闭时被调用
    void sessionIdle(IoSession session, IdleStatus status) throws Exception;//会话空闲
    void exceptionCaught(IoSession session, Throwable cause) throws Exception;//异常捕获,Mina会自动关闭此连接
    void messageReceived(IoSession session, Object message) throws Exception;//接收到消息
    void messageSent(IoSession session, Object message) throws Exception;//发送消息
}

      IoHandlerAdapter就不说了,简单地对IoHandler使用适配器模式封装了下,让具体的IoHandler子类从其继承后,从而可以对自身需要哪些事件处理拥有自主权。

      来看看IoServiceListener接口,它用于监听IoService相关的事件。
技术分享
public interface IoServiceListener extends EventListener
{
    void serviceActivated(IoService service) throws Exception;//激活了一个新service
    void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception; // service闲置
    void serviceDeactivated(IoService service) throws Exception;//挂起一个service
    void sessionCreated(IoSession session) throws Exception;//创建一个新会话
    void sessionDestroyed(IoSession session) throws Exception;//摧毁一个新会话
}

     IoServiceListenerSupport类就是负责将上面的IoService和其对应的各个IoServiceListener包装到一起进行管理。

技术分享

下面是它的成员变量:


    private final IoService service;
    private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
    private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();//被管理的会话集(其实就是服务所管理的会话集)
    private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);//上面的会话集的只读版
    private final AtomicBoolean activated = new AtomicBoolean();//被管理的服务是否处于激活状态

     激活事件就以会话创建为例来说明:


public void fireSessionCreated(IoSession session)
{
        boolean firstSession = false;
        if (session.getService() instanceof IoConnector)
{//若服务类型是Connector,则说明是客户端的连接服务  
            synchronized (managedSessions)
{//锁住当前已经建立的会话集
                firstSession = managedSessions.isEmpty();//看服务所管理的会话集是否为空集
            }
        }
               if (managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null) { // If already registered, ignore.
            return;
        }
        if (firstSession)
{//第一个连接会话,fire一个虚拟的服务激活事件
            fireServiceActivated();
        }
        //呼叫过滤器的事件处理
        session.getFilterChain().fireSessionCreated();// 会话创建
        session.getFilterChain().fireSessionOpened();//会话打开
        int managedSessionCount = managedSessions.size();
        //统计管理的会话数目
        if (managedSessionCount > largestManagedSessionCount)
{
            largestManagedSessionCount = managedSessionCount;
        }
        cumulativeManagedSessionCount ++;
        //呼叫监听者的事件处理函数
        for (IoServiceListener l : listeners)
{
            try
 {
                l.sessionCreated(session);
            } catch (Throwable e)
{
                ExceptionMonitor.getInstance().exceptionCaught(e);
            }
        }
    }

     这里值得注意的一个地方是断开连接会话,设置了一个监听锁,直到所有连接会话被关闭后才放开这个锁。

 
  private void disconnectSessions()
    {
        if (!(service instanceof IoAcceptor))
        {//确保服务类型是IoAcceptor
            return;
        }
        if (!((IoAcceptor) service).isCloseOnDeactivation())
        {// IoAcceptor是否设置为在服务失效时关闭所有连接会话
            return;
        }
        Object lock = new Object();//监听锁
        IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
        for (IoSession s : managedSessions.values())
        {
            s.close().addListener(listener);//为每个会话的close动作增加一个监听者
        }
        try
        {
            synchronized (lock)
            {
                while (!managedSessions.isEmpty())
                {//所管理的会话还没有全部结束,持锁等待
                    lock.wait(500);
                }
            }
        } catch (InterruptedException ie)
        {
            // Ignored
        }
    }
    private static class LockNotifyingListener implements IoFutureListener<IoFuture>
    {
        private final Object lock;
        public LockNotifyingListener(Object lock)
        {
            this.lock = lock;
        }
        public void operationComplete(IoFuture future)
{
        
            synchronized (lock)
            {
                lock.notifyAll();
            }
        }
    }

   

Mina2.0x框架+源码分析