首页 > 代码库 > NIO byteBUffer 讲解 及Mina 源码分析

NIO byteBUffer 讲解 及Mina 源码分析

1.传统的socket: 阻塞式通信模式 tcp连接: 与服务器连接时 .必须等到连接成功后 才返回 . udp连接: 客户端发送数据 ,必须等到发送成功后返回 .
每建立一个 Scoket连接时, 同事创建一个新线程对该 Socket进行单独通信(采用阻塞式通信 )
这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果
对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况

2.1NIO 设计背后的基石:反应器模式,用于事件多路分离和分派的体系结构模式。
反应器模式的核心功能如下:
将事件多路分用
将事件分派到各自相应的事件处理程序

NIO 的非阻塞 I/O 机制是围绕 选择器和 通道构建的。 Channel 类表示服务器和客户机之间的
一种通信机制。 Selector 类是 Channel 的多路复用器。 Selector 类将传入客户机请求多路分
用并将它们分派到各自的请求处理程序。
通道(Channel  ):表示服务器和客户机之间的一种通信机制。
选择器(Selector ):是 Channel 的多路复用器。 Selector 类将传入的客户机请求多路分用并将它们
分派到各自的请求处理程序。

简单的来说:

NIO是一个基于事件的IO架构,最基本的思想就是:有事件我通知你,你再去做你的事情.
而且NIO的主线程只有一个,不像传统的模型,需要多个线程以应对客户端请求,也减轻
JVM的工作量。

有个通俗的比喻 :在一个餐厅里.每来个顾客都要一个服务员 服务客户. 有时候客户点菜慢 服务员必须等..如果有 10个客人同时来餐厅 则必须要 10服务员服务 这是传统的基于多线程方式的 Socket

nio: 基于事件型, 让一个服务员 同时服务多个客人.比如 :有一个客人来了.服务员先招待 .给菜谱让客人点菜.当客人点菜时 .可以暂时离开 招呼别的客人 .等客人点好菜后.客人叫唤服务员 .服务员再过来 把客人点好的菜单 送到厨房. 然后再服务其他的客人

关键 对象 ByteBuffer

ByteBuffer是NIO里用得最多的Buffer,它包含两个实现方式:HeapByteBuffer是基于Java堆的实现,而DirectByteBuffer则使用了unsafe的API进行了堆外的实现。这里只说HeapByteBuffer。

使用

ByteBuffer最核心的方法是put(byte)get()。分别是往ByteBuffer里写一个字节,和读一个字节。

值得注意的是,ByteBuffer的读写模式是分开的,正常的应用场景是:往ByteBuffer里写一些数据,然后flip(),然后再读出来。

这里插两个Channel方面的对象,以便更好的理解Buffer。

ReadableByteChannel是一个从Channel中读取数据,并保存到ByteBuffer的接口,它包含一个方法:

1public int read(ByteBuffer dst) throws IOException;

WritableByteChannel则是从ByteBuffer中读取数据,并输出到Channel的接口:

1public int write(ByteBuffer src) throws IOException;

那么,一个ByteBuffer的使用过程是这样的:

1byteBuffer = ByteBuffer.allocate(N);
2//读取数据,写入byteBuffer
3readableByteChannel.read(byteBuffer);
4//变读为写
5byteBuffer.flip();
6//读取byteBuffer,写入数据
7writableByteChannel.write(byteBuffer);

看到这里,一般都不太明白flip()干了什么事,先从ByteBuffer结构说起:

ByteBuffer内部字段

byte[] buff

buff即内部用于缓存的数组。

position

当前读取的位置。

mark

为某一读过的位置做标记,便于某些时候回退到该位置。

capacity

初始化时候的容量。

limit

读写的上限,limit<=capacity。

图解

put

写模式下,往buffer里写一个字节,并把postion移动一位。写模式下,一般limit与capacity相等。
bytebuffer-put

flip

写完数据,需要开始读的时候,将postion复位到0,并将limit设为当前postion。
bytebuffer-flip

get

从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。
bytebuffer-get

clear

将position置为0,并不清除buffer内容。
bytebuffer-clear

mark相关的方法主要是mark()(标记)和reset()(回到标记),比较简单,就不画图了。


MINA 源码 笔记:

mina 是NIO 运行 封装 基于NIO 的网络通信框架. 每个链接都会创建一个IOSession。

图中清晰的显示了IO Processor就是位于IoService和IoFilter之间,IoService负责和外部建立连接,而IoFilter则负责处理接收到的数据,IoProcessor则负责数据的收发工作。

创建的 每个链接. 服务端都会有对应的 处理IOFilterChain 是 一个过滤器 链. 每个链接创建的IOsession 可以有自己的IOFilterChain。 默认的 是DefaultIoFilterChain。
DefaultIoFilterChain 中完成 责任链的传递执行 及 handler 回调函数 执行
当接受到 数据 时:




每个Session 的顶层是一个抽象的IOsession。 abstract class NioSession extends AbstractIoSession.NioSession 是绑定在NIOProcessor IO 处理通道中的 SelectionKey
NioSession session = (NioSession) key.attachment();

NioSession里 客户端的 Channel . IOProcess 的引用. 还有注册键 key SelectionKey key. 还有IOFilterChain filterChain.


IOSession里 有 对数据独享状态 和位置的 记录. 读写 处理队列. 还有 客户端实现的IOHandler.
IOSesion 绑定了一系列的 执行链表 WriteRequestQueue 、 Queue<ReadFuture> readyReadFutures 处理读取的ReadFuture 、IOFilterChain
客户端实现的IOHandler 和 WriteRequestQueue readyReadFutures 时间 通过IOFilterChain 触发 完成


IOSession 的 抽象类 实现的 AbstractIoSession. 他们的关系是:
1.AbstractIoSession implements IoSession
2. NioSession extends AbstractIoSession
3.NioSocketSession extends NioSession
NioSocketSession 是顶层封装 直接包装客户端的 实现 比如:

/**
* {@inheritDoc}
*/
public InetSocketAddress getLocalAddress() {
if (channel == null) {
return null;
}

Socket socket = getSocket();

if (socket == null) {
return null;
}

return (InetSocketAddress) socket.getLocalSocketAddress();
}

@Override
SocketChannel getChannel() {
return (SocketChannel) channel;
}


IoFuture ....> DefaultIoFuture 对应数据的读写同步 处理. 同步锁 来完成数据 读取的一致性. 当数据没有读取完时. 别的线程要获取 客户端的数据 需要等待. 内部通过 synchronized ( lock) {
if ( ready) {
return ready;
} else if (timeoutMillis <= 0) {
return ready;
}
来控制. 当数据读取完 通过setValue 方法 来设置 值.并通过 lock.notifyAll(); 来唤醒别 的线程.
public void setValue(Object newValue) {
synchronized ( lock) {
// Allow only once.
if ( ready) {
return;
}

result = newValue;
ready = true;
if ( waiters > 0) {
lock.notifyAll();
}
}

notifyListeners();
}
IoFuture 里面可以添加IoFutureListener(接口). 当读取数据后. notifyListeners 通知监听器 执行. IoFutureListener 包含的元素IoFuture. IoFutureListener里面 有operationComplete 接口方法.在具体子类IoFutureListener实现方法时. 调用 IoFuture 对应 的方法. 比如:

private class SessionCloseListener implements IoFutureListener<IoFuture> {
/**
* Default constructor
*/
public SessionCloseListener() {
super();
}

public void operationComplete(IoFuture future) {
removeSession((AbstractIoSession) future.getSession());
}
}

IoFuture 有多种实现. 比如:DefaultWriteFuture DefaultReadFuture DefaultConnectFuture

这些IoFuture 在IOSession里调度.

面对客户端的 是 SocketConnector 用户可以他 来创建链接. 默认实现是 NioSocketConnector 

在此说下 服务端创建过程:

但它 也是皮包一层. 功能都依托 它 的父类 AbstractPollingIoConnector<NioSession, SocketChannel>.它 是连接处理器.里面保存链接队列. 处理带链接的Socket 请求.当每个链接请求完成后. 并产生session。 过程在 connect0() 完成.然后把产生的session 交由IOProcessor 处理器.具体实现代码:以下为 connect0()核心过程:
try {
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture(); //产生ConnectFuture 管理器 通过他可以感知整个session 创建过程是否完成.
T session = newSession( processor, handle); //在此生成 session
initSession(session, future, sessionInitializer); //初始化session 信息
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session); //把新创建session 交由IOProcessor 处理  tag1
success = true;
return future;
}

它提供用户自定义的 构造函数. 比如 指定 IOProcessor 的线程管理器个数 还会processor 具体参考 NioSocketConnector 实现.

AbstractPollingIoConnector 主要维护两个队列:
Queue<ConnectionRequest> connectQueue //处理新连接请求
Queue<ConnectionRequest> cancelQueue//处理失败 或取消的 链接 请求

服务端AbstractPollingIoAcceptor 与 AbstractPollingIoConnector 功能类似:
  private final Queue<AcceptorOperationFuture> registerQueue new ConcurrentLinkedQueue<AcceptorOperationFuture>();
   private final Queue<AcceptorOperationFuture> cancelQueue new ConcurrentLinkedQueue<AcceptorOperationFuture>();



IoProcessor: 

最主要的 是IoProcessor<T> processor; IO处理器.所有 的session运行控制 调度 都在IO处理器完成.里面维护了三个session队列.

private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); //保存所有 新创建的session 回话

/** A queue used to store the sessions to be removed */
private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();// 需要移除 的 session 回话

/** A queue used to store the sessions to be flushed */
private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();// 处理完 需要flush 的回话



默认的IOProcessor 是AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S>
IOProcessor 在NioSocketConnector 构造函数指定:
从上可知. 初始化是父类 AbstractPollingIoProcessor的 参数
  protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
        this(sessionConfig, nullnew SimpleIoProcessorPool<T>(processorClass), true);
    }

对比服务端NioSocketAcceptor 初始化过程:

过程 和客户端一样的  数据的读取还是交由 NioProcessor 执行.


注意其中的SimpleIoProcessorPool  它接受一个IOprocessor.Class 类型 参数. 并通过反射生成真正的IOprocessor 对象

 SimpleIoProcessorPool 里面几个重要属性
      /** The contained  which is passed to the IoProcessor when they are created */
 private final Executor executor;用于执行session 读取的线程组 
/** The pool table */
private final IoProcessor<S>[] pool; 用来处理session 的 IoProcessor 的个数 默认的个数的
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; // 当前处理器数 +1

AbstractPollingIoProcessor具体初始化 参见其构造函数 
  public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size) {....


触发IOprocessor 对session 读取操作 在NIOConnector 中 connect0() session.getProcessor().add(session);
 |
\|/
public final void add(S session) {
        if ( disposed || disposing) {
            throw new IllegalStateException( "Already disposed.");
        }

        // Adds the session to the newSession queue and starts the worker
        newSessions.add(session);
        startupProcessor ();
    }
 |
\|/
  private void startupProcessor () {
        Processor processor = processorRef.get();

        if (processor == null) {
            processor = new Processor(); // 包装 session。 负责执行 session 把它 放入对应的flushingSessions newSessions removingSessions 中

            if ( processorRef.compareAndSet( null, processor)) {
               executor.execute(new NamePreservingRunnable(processor, threadName )); //在此执行
            }
        }

        // Just stop the select() and start it again, so that the processor
        // can be activated immediately.
        wakeup();
    }

 |
\|/

 来看下 Processor 结构:

private class Processor implements Runnable {
public void run() {
assert ( processorRef.get() == this); // processorRef 表示 当前 执行 session 对象引用.

int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();

for (;;) { // 循环执行. 知道发生异常 退出
try {
// This select has a timeout so that we can manage
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
long t0 = System. currentTimeMillis();
int selected = select(SELECT_TIMEOUT);
long t1 = System. currentTimeMillis();
long delta = (t1 - t0);

if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) { // 如果当前多路复用器 上没有注册对象 或是 获取超时 则退出
// Last chance : the select() may have been
// interrupted because we have had an closed channel.
if (isBrokenConnection()) {
LOG.warn( "Broken connection");

// we can reselect immediately
// set back the flag to false
wakeupCalled.getAndSet(false );

continue;
} else {
LOG.warn( "Create a new selector. Selected is 0, delta = " + (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
// Basically, there is a race condition
// which causes a closing file descriptor not to be
// considered as available as a selected channel, but
// it stopped the select. The next time we will
// call select(), it will exit immediately for the same
// reason, and do so forever, consuming 100%
// CPU.
// We have to destroy the selector, and
// register all the socket on a new one.
registerNewSelector();
}

// Set back the flag to false
wakeupCalled.getAndSet( false);

// and continue the loop
continue;
}

// Manage newly created session first
nSessions += handleNewSessions();

updateTrafficMask();

// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
process(); // 核心方法. 负责执行 调度 session
附上具体代码:
private void process () throws Exception {
for (Iterator<S> i = selectedSessions(); i.hasNext();) {
S session = i.next();
process(session);
i.remove();
}
}
}
 |
\|/
   private void process(S session) {
        // Process Reads
        if (isReadable(session) && !session.isReadSuspended()) {
            read(session); //读取 session 数据 
        }

        // Process writes
        if (isWritable(session) && !session.isWriteSuspended()) {
            // add the session to the queue, if it‘s not already there
            if (session.setScheduledForFlush( true)) {
                flushingSessions.add(session);
            }
        }
    }

 |
\|/

private void read(S session) {
        IoSessionConfig config = session.getConfig();
        int bufferSize = config.getReadBufferSize();
        IoBuffer buf = IoBuffer. allocate(bufferSize);

        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {

                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;

                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);

                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
          //触发过滤器链执行. 
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null

                if (hasFragmentation) { //对byteBuffer 扩容 
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }

            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                if (!(e instanceof PortUnreachableException)
                        || !AbstractDatagramSessionConfig.class .isAssignableFrom(config.getClass())
                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
                    scheduleRemove(session);
                }
            }

            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }