首页 > 代码库 > Netty3 源码分析 - NIO server接受连接请求过程分析

Netty3 源码分析 - NIO server接受连接请求过程分析

Netty3 源码分析 - NIO server接受连接请求过程分析

     当服务器端的server Channel绑定某个端口之后,就可以处理来自客户端的连接请求,而且在构建 NioServerSocketChannelFactory 的时候已经生成了对应的 BossPool 和 WorkerPool,前者管理的 NioServerBoss 就是专门用来接受客户端连接的Selector封装,当,下面是关键的代码:

1. AbstractNioSelector中的run方法代表这个boss thread的核心工作。
public void run() {
        thread = Thread. currentThread();
        // 打开闭锁;
        startupLatch.countDown();

        int selectReturnsImmediately = 0;
        Selector selector = this. selector;

        if (selector == null) {
            return;
        }
        // use 80% of the timeout for measure
        final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
        boolean wakenupFromLoop = false;
        for (;;) {
            wakenUp.set( false);

            try {
                long beforeSelect = System. nanoTime();
                // 返回有多少个Channel准备好了
                int selected = select(selector);
                if (SelectorUtil. EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp .get()) {
                      //上述select阻塞的时间;
                    long timeBlocked = System. nanoTime() - beforeSelect;

                    //如果小于最小超时时间限制;
                    if (timeBlocked < minSelectTimeout) {
                        boolean notConnected = false;
                        // loop over all keys as the selector may was unblocked because of a closed channel
                        for (SelectionKey key: selector.keys()) {
                            SelectableChannel ch = key.channel();
                            try {
                                if (ch instanceof DatagramChannel && !ch.isOpen() ||
                                        ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
                                    notConnected = true;
                                    // cancel the key just to be on the safe side
                                    key.cancel();
                                }
                            } catch (CancelledKeyException e) {
                                // ignore
                            }
                        }
                        if (notConnected) {
                            selectReturnsImmediately = 0;
                        } else {
                            //在超时限制之前就返回,并且返回的结果是0,这或许是导致 jdk epoll bug的原因,累积。
                            selectReturnsImmediately ++;
                        }
                    } else {
                      // 是超时。
                        selectReturnsImmediately = 0;
                    }
                   
                    // 这是jdk epoll bug,所以需要替换掉这个Selector!!!
                    //然后重新下一轮的select处理。
                    if (selectReturnsImmediately == 1024) {
                        // The selector returned immediately for 10 times in a row,
                        // so recreate one selector as it seems like we hit the
                        // famous epoll (..) jdk bug.
                        rebuildSelector();
                        selector = this. selector;
                        selectReturnsImmediately = 0;
                        wakenupFromLoop = false;
                        // try to select again
                        continue;
                    }
                } else {
                    // reset counter
                    selectReturnsImmediately = 0;
                }

                /**
                 * 在调用selector.wakeup()之前总是先执行wakenUp.compareAndSet(false, true),
                 * 来减小wake -up的开销,因为Selector.wakeup()执行的代价很大。
                 * 然后这种方法存在一种竟态条件,发生在如果把 wakenUp 设置为true太早的时候:
                 * 1) Selecttor在‘wakenUp.set(false)‘和‘selector.select(...)‘之间醒来(BAD);
                 * 2)在‘selector.select(...)‘和‘if (wakenUp.get()) { ... }‘醒来时OK的。
                 * 在第一种情况下,‘wakenUp‘被置为了true,但是没有对那个select生效,所以他会让接下来的那个
                 * ‘selector.select(...)‘立即醒来。直到在下一轮循环当中‘wakenUp‘ 被再次置为FALSE的时候,
                 * 那么 ‘wakenUp.compareAndSet(false, true)‘就会失败,任何想惊醒Selector的尝试都会失败,
                 * 导致接下来的‘selector.select(...)‘方法无谓的阻塞。
                 *
                 * 为了解决这个问题,就在selector.select(...)之后,判断wakenUp是true的时候,立即调用一次
                 * selector.wakeup()。
                 * 对这两种情况来说,惊醒selector的操作都是低效的。
                 */
                if ( wakenUp.get()) {
                    wakenupFromLoop = true;
                    selector.wakeup();
                } else {
                    wakenupFromLoop = false;
                }

                cancelledKeys = 0;
                processTaskQueue(); // 处理任务
                selector = this. selector// processTaskQueue() can call rebuildSelector()

                if ( shutdown) {
                    this. selector = null;

                    // process one time again
                    processTaskQueue();

                    for (SelectionKey k: selector.keys()) {
                        close(k);
                    }

                    try {
                      // 要关闭Selector;
                        selector.close();
                    } catch (IOException e) {
                        logger.warn( "Failed to close a selector.", e);
                    }
                    // 打开这个闭锁;
                    shutdownLatch.countDown();
                    break;
                } else {
                      //核心的过程,有具体的NioSelector来实现
                    process(selector);
                }
            } catch (Throwable t) {
                logger.warn(
                        "Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread. sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

2. 具体的流程处理在具体的 NioServerBoss 中,具体的处理连接请求。
  protected void process(Selector selector) {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove(); //
            // 得到监听套接字通道
            NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();

            try {
                // accept connections in a for loop until no new connection is ready
                for (;;) {
                     
                    SocketChannel acceptedSocket = channel.socket .accept();
                    // 非阻塞模式
                    if (acceptedSocket == null) {
                        break;
                    }
                    // 有连接请求的到来,那么就分发给worker处理。
                    registerAcceptedChannel(channel, acceptedSocket, thread );
                }
            } catch (CancelledKeyException e) {
                // Raised by accept() when the server socket was closed.
                k.cancel();
                channel.close();
            } catch (SocketTimeoutException e) {
                // Thrown every second to get ClosedChannelException
                // raised.
            } catch (ClosedChannelException e) {
                // Closed as requested.
            } catch (Throwable t) {
                if ( logger.isWarnEnabled()) {
                    logger.warn( "Failed to accept a connection.", t);
                }

                try {
                    Thread. sleep(1000);
                } catch (InterruptedException e1) {
                    // Ignore
                }
            }
        }
    }

3. 把这个连接套接字分发给一个worker pool中下一个worker来处理。
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
                                         Thread currentThread) {
        try {
            ChannelSink sink = parent.getPipeline().getSink();
            ChannelPipeline pipeline =  parent.getConfig().getPipelineFactory().getPipeline();
            //安排一个线程来处理这个连接通道 acceptedSocket
            NioWorker worker = parent. workerPool.nextWorker();
            worker.register( new NioAcceptedSocketChannel(
                    parent.getFactory(), pipeline, parent, sink
                    , acceptedSocket,
                    worker, currentThread), null);
        } catch (Exception e) {
            if ( logger.isWarnEnabled()) {
                logger.warn( "Failed to initialize an accepted socket.", e);
            }

            try {
                acceptedSocket.close();
            } catch (IOException e2) {
                if ( logger.isWarnEnabled()) {
                    logger.warn( "Failed to close a partially accepted socket.",e2);
                }
            }
        }
    }

4. 接下来就是每个worker处理一个连接的读写服务,NioWorker中的 process 负责这些工作。
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        //如果集合为空就立即返回而不是每次创建迭代器,却无事可做。
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            try {
                 //获取这个SelectionKey的就绪操作集合。
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey. OP_READ) != 0 || readyOps == 0) {
                    if (!read(k)) {
                        // Connection already closed - no need to handle write.
                        continue;
                    }
                }
                if ((readyOps & SelectionKey. OP_WRITE) != 0) {
                    writeFromSelectorLoop(k);
                }
            } catch (CancelledKeyException e) {
                close(k);
            }

            if (cleanUpCancelledKeys()) {
                break// break the loop to avoid ConcurrentModificationException
            }
        }
    }


5.两种任务的层次结构图为:







Netty3 源码分析 - NIO server接受连接请求过程分析