mina 是NIO 运行 封装 基于NIO 的网络通信框架. 每个链接都会创建一个IOSession。
创建的 每个链接. 服务端都会有对应的 处理IOFilterChain 是 一个过滤器 链. 每个链接创建的IOsession 可以有自己的IOFilterChain。 默认的 是DefaultIoFilterChain。
IoFuture 有多种实现. 比如:DefaultWriteFuture DefaultReadFuture DefaultConnectFuture
这些IoFuture 在IOSession里调度.
面对客户端的 是 SocketConnector 用户可以他 来创建链接. 默认实现是 NioSocketConnector
在此说下 服务端创建过程:
但它 也是皮包一层. 功能都依托 它 的父类 AbstractPollingIoConnector<NioSession, SocketChannel>.它 是连接处理器.里面保存链接队列. 处理带链接的Socket 请求.当每个链接请求完成后. 并产生session。 过程在 connect0() 完成.然后把产生的session 交由IOProcessor 处理器.具体实现代码:以下为 connect0()核心过程:
try {
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture(); //产生ConnectFuture 管理器 通过他可以感知整个session 创建过程是否完成.
T session = newSession( processor, handle); //在此生成 session
initSession(session, future, sessionInitializer); //初始化session 信息
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session); //把新创建session 交由IOProcessor 处理 tag1
success = true;
return future;
}
它提供用户自定义的 构造函数. 比如 指定 IOProcessor 的线程管理器个数 还会processor 具体参考 NioSocketConnector 实现.
AbstractPollingIoConnector 主要维护两个队列:
Queue<ConnectionRequest> connectQueue //处理新连接请求
Queue<ConnectionRequest> cancelQueue//处理失败 或取消的 链接 请求
服务端AbstractPollingIoAcceptor 与 AbstractPollingIoConnector 功能类似:
private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
IoProcessor:
最主要的 是IoProcessor<T> processor; IO处理器.所有 的session运行控制 调度 都在IO处理器完成.里面维护了三个session队列.
private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); //保存所有 新创建的session 回话
/** A queue used to store the sessions to be removed */
private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();// 需要移除 的 session 回话
/** A queue used to store the sessions to be flushed */
private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();// 处理完 需要flush 的回话
默认的IOProcessor 是AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S>
IOProcessor 在NioSocketConnector 构造函数指定:
从上可知. 初始化是父类 AbstractPollingIoProcessor的 参数
protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
}
对比服务端NioSocketAcceptor 初始化过程:
过程 和客户端一样的 数据的读取还是交由 NioProcessor 执行.
注意其中的SimpleIoProcessorPool 它接受一个IOprocessor.Class 类型 参数. 并通过反射生成真正的IOprocessor 对象
SimpleIoProcessorPool 里面几个重要属性
/** The contained which is passed to the IoProcessor when they are created */
private final Executor executor;用于执行session 读取的线程组
/** The pool table */
private final IoProcessor<S>[] pool; 用来处理session 的 IoProcessor 的个数 默认的个数的
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; // 当前处理器数 +1
AbstractPollingIoProcessor具体初始化 参见其构造函数
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size) {....
触发IOprocessor 对session 读取操作 在NIOConnector 中 connect0() session.getProcessor().add(session);
|
\|/
public final void add(S session) {
if ( disposed || disposing) {
throw new IllegalStateException( "Already disposed.");
}
// Adds the session to the newSession queue and starts the worker
newSessions.add(session);
startupProcessor ();
}
|
\|/
private void startupProcessor () {
Processor processor = processorRef.get();
if (processor == null) {
processor = new Processor(); // 包装 session。 负责执行 session 把它 放入对应的flushingSessions newSessions removingSessions 中
if ( processorRef.compareAndSet( null, processor)) {
executor.execute(new NamePreservingRunnable(processor, threadName )); //在此执行
}
}
// Just stop the select() and start it again, so that the processor
// can be activated immediately.
wakeup();
}
来看下 Processor 结构:
private class Processor implements Runnable {
public void run() {
assert ( processorRef.get() == this); // processorRef 表示 当前 执行 session 对象引用.
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) { // 循环执行. 知道发生异常 退出
try {
// This select has a timeout so that we can manage
// idle session when we get out of the select every
// second. (note : this is a hack to avoid creating
// a dedicated thread).
long t0 = System. currentTimeMillis();
int selected = select(SELECT_TIMEOUT);
long t1 = System. currentTimeMillis();
long delta = (t1 - t0);
if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) { // 如果当前多路复用器 上没有注册对象 或是 获取超时 则退出
// Last chance : the select() may have been
// interrupted because we have had an closed channel.
if (isBrokenConnection()) {
LOG.warn( "Broken connection");
// we can reselect immediately
// set back the flag to false
wakeupCalled.getAndSet(false );
continue;
} else {
LOG.warn( "Create a new selector. Selected is 0, delta = " + (t1 - t0));
// Ok, we are hit by the nasty epoll
// spinning.
// Basically, there is a race condition
// which causes a closing file descriptor not to be
// considered as available as a selected channel, but
// it stopped the select. The next time we will
// call select(), it will exit immediately for the same
// reason, and do so forever, consuming 100%
// CPU.
// We have to destroy the selector, and
// register all the socket on a new one.
registerNewSelector();
}
// Set back the flag to false
wakeupCalled.getAndSet( false);
// and continue the loop
continue;
}
// Manage newly created session first
nSessions += handleNewSessions();
updateTrafficMask();
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
process(); // 核心方法. 负责执行 调度 session
附上具体代码:
private void process () throws Exception {
for (Iterator<S> i = selectedSessions(); i.hasNext();) {
S session = i.next();
process(session);
i.remove();
}
}
}
|
\|/
private void process(S session) {
// Process Reads
if (isReadable(session) && !session.isReadSuspended()) {
read(session); //读取 session 数据
}
// Process writes
if (isWritable(session) && !session.isWriteSuspended()) {
// add the session to the queue, if it‘s not already there
if (session.setScheduledForFlush( true)) {
flushingSessions.add(session);
}
}
}
private void read(S session) {
IoSessionConfig config = session.getConfig();
int bufferSize = config.getReadBufferSize();
IoBuffer buf = IoBuffer. allocate(bufferSize);
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
try {
int readBytes = 0;
int ret;
try {
if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}
if (readBytes > 0) {
//触发过滤器链执行.
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
buf = null;
if (hasFragmentation) { //对byteBuffer 扩容
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
scheduleRemove(session);
}
} catch (Throwable e) {
if (e instanceof IOException) {
if (!(e instanceof PortUnreachableException)
|| !AbstractDatagramSessionConfig.class .isAssignableFrom(config.getClass())
|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
scheduleRemove(session);
}
}
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
}