首页 > 代码库 > Netty3 源码分析 - NIO server接受连接请求过程分析
Netty3 源码分析 - NIO server接受连接请求过程分析
Netty3 源码分析 - NIO server接受连接请求过程分析
当服务器端的server Channel绑定某个端口之后,就可以处理来自客户端的连接请求,而且在构建 NioServerSocketChannelFactory 的时候已经生成了对应的 BossPool 和 WorkerPool,前者管理的 NioServerBoss 就是专门用来接受客户端连接的Selector封装,当,下面是关键的代码:
1. AbstractNioSelector中的run方法代表这个boss thread的核心工作。
public void run() {
thread = Thread. currentThread();
// 打开闭锁;
startupLatch.countDown();
int selectReturnsImmediately = 0;
Selector selector = this. selector;
if (selector == null) {
return;
}
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
for (;;) {
wakenUp.set( false);
try {
long beforeSelect = System. nanoTime();
// 返回有多少个Channel准备好了
int selected = select(selector);
if (SelectorUtil. EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp .get()) {
//上述select阻塞的时间;
long timeBlocked = System. nanoTime() - beforeSelect;
//如果小于最小超时时间限制;
if (timeBlocked < minSelectTimeout) {
boolean notConnected = false;
// loop over all keys as the selector may was unblocked because of a closed channel
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof DatagramChannel && !ch.isOpen() ||
ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException e) {
// ignore
}
}
if (notConnected) {
selectReturnsImmediately = 0;
} else {
//在超时限制之前就返回,并且返回的结果是0,这或许是导致 jdk epoll bug的原因,累积。
selectReturnsImmediately ++;
}
} else {
// 是超时。
selectReturnsImmediately = 0;
}
// 这是jdk epoll bug,所以需要替换掉这个Selector!!!
//然后重新下一轮的select处理。
if (selectReturnsImmediately == 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll (..) jdk bug.
rebuildSelector();
selector = this. selector;
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
/**
* 在调用selector.wakeup()之前总是先执行wakenUp.compareAndSet(false, true),
* 来减小wake -up的开销,因为Selector.wakeup()执行的代价很大。
* 然后这种方法存在一种竟态条件,发生在如果把 wakenUp 设置为true太早的时候:
* 1) Selecttor在‘wakenUp.set(false)‘和‘selector.select(...)‘之间醒来(BAD);
* 2)在‘selector.select(...)‘和‘if (wakenUp.get()) { ... }‘醒来时OK的。
* 在第一种情况下,‘wakenUp‘被置为了true,但是没有对那个select生效,所以他会让接下来的那个
* ‘selector.select(...)‘立即醒来。直到在下一轮循环当中‘wakenUp‘ 被再次置为FALSE的时候,
* 那么 ‘wakenUp.compareAndSet(false, true)‘就会失败,任何想惊醒Selector的尝试都会失败,
* 导致接下来的‘selector.select(...)‘方法无谓的阻塞。
*
* 为了解决这个问题,就在selector.select(...)之后,判断wakenUp是true的时候,立即调用一次
* selector.wakeup()。
* 对这两种情况来说,惊醒selector的操作都是低效的。
*/
if ( wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys = 0;
processTaskQueue(); // 处理任务
selector = this. selector; // processTaskQueue() can call rebuildSelector()
if ( shutdown) {
this. selector = null;
// process one time again
processTaskQueue();
for (SelectionKey k: selector.keys()) {
close(k);
}
try {
// 要关闭Selector;
selector.close();
} catch (IOException e) {
logger.warn( "Failed to close a selector.", e);
}
// 打开这个闭锁;
shutdownLatch.countDown();
break;
} else {
//核心的过程,有具体的NioSelector来实现
process(selector);
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread. sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
2. 具体的流程处理在具体的 NioServerBoss 中,具体的处理连接请求。
protected void process(Selector selector) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove(); //
// 得到监听套接字通道
NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
try {
// accept connections in a for loop until no new connection is ready
for (;;) {
SocketChannel acceptedSocket = channel.socket .accept();
// 非阻塞模式
if (acceptedSocket == null) {
break;
}
// 有连接请求的到来,那么就分发给worker处理。
registerAcceptedChannel(channel, acceptedSocket, thread );
}
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
k.cancel();
channel.close();
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable t) {
if ( logger.isWarnEnabled()) {
logger.warn( "Failed to accept a connection.", t);
}
try {
Thread. sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}
3. 把这个连接套接字分发给一个worker pool中下一个worker来处理。
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
Thread currentThread) {
try {
ChannelSink sink = parent.getPipeline().getSink();
ChannelPipeline pipeline = parent.getConfig().getPipelineFactory().getPipeline();
//安排一个线程来处理这个连接通道 acceptedSocket
NioWorker worker = parent. workerPool.nextWorker();
worker.register( new NioAcceptedSocketChannel(
parent.getFactory(), pipeline, parent, sink
, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
if ( logger.isWarnEnabled()) {
logger.warn( "Failed to initialize an accepted socket.", e);
}
try {
acceptedSocket.close();
} catch (IOException e2) {
if ( logger.isWarnEnabled()) {
logger.warn( "Failed to close a partially accepted socket.",e2);
}
}
}
}
4. 接下来就是每个worker处理一个连接的读写服务,NioWorker中的 process 负责这些工作。
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
//如果集合为空就立即返回而不是每次创建迭代器,却无事可做。
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
//获取这个SelectionKey的就绪操作集合。
int readyOps = k.readyOps();
if ((readyOps & SelectionKey. OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey. OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException
}
}
}
5.两种任务的层次结构图为:
Netty3 源码分析 - NIO server接受连接请求过程分析
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。