首页 > 代码库 > 从epoll构建muduo-13 Reactor + ThreadPool 成型

从epoll构建muduo-13 Reactor + ThreadPool 成型

mini-muduo版本传送门
version 0.00 从epoll构建muduo-1 mini-muduo介绍
version 0.01 从epoll构建muduo-2 最简单的epoll
version 0.02 从epoll构建muduo-3 加入第一个类,顺便介绍Reactor
version 0.03 从epoll构建muduo-4 加入Channel
version 0.04 从epoll构建muduo-5 加入Acceptor和TcpConnection
version 0.05 从epoll构建muduo-6 加入EventLoop和Epoll
version 0.06 从epoll构建muduo-7 加入IMuduoUser
version 0.07 从epoll构建muduo-8 加入发送缓冲区和接收缓冲区
version 0.08 从epoll构建muduo-9 加入onWriteComplate回调和Buffer
version 0.09 从epoll构建muduo-10 Timer定时器
version 0.11 从epoll构建muduo-11 单线程Reactor网络模型成型
version 0.12 从epoll构建muduo-12 多线程代码入场
version 0.13 从epoll构建muduo-13 Reactor + ThreadPool 成型


mini-muduo v0.13版本,mini-muduo完整可运行示例可从github下载,使用命令git checkout v0.13可切换到此版本,在线浏览此版本到这里

本版是个里程碑版本,可以通过本版了解多线程是如何通过IO线程读/写网络数据的,在前一个版本v0.12重点介绍了基础知识的前提下,本篇着重分析多线程逻辑里最重要的三个方法EventLoop::runInLoop/EventLoop::queueInLoop/EventLoop::doPendingFunctors。下面逐步介绍本版本修改的细节,三个方法放在最后的EventLoop节。


1 Task类

    这个类是v0.13版本新加入的,它就是一个携带参数的回调。它的作用就是闭包(closure),它是我们用来代替muduo里boost::function和boost:bind的。为什么不使用boost::function和boost:bind?之前解释过了,为了不引入新概念,降低mini-muduo的学习成本。这个Task类不太具有通用性(不像BlockingQueue,范型实现),只是为了在本项目里使用的,Task只支持两种类型的回调,第一种是无参数的回调,被调用者只需要实现一个”void run0()“,第二种是有两个参数的回调,被调用者实现"void run2(const string&, void*)"。因为有了Task类,所有需要异步回调的地方都用它来实现了。

2 TcpConnection

    添加了一个sendInLoop方法,把原来send方法里的实现移动到了sendInLoop方法里,而send方法本身变成了一个外部接口的包装。根据调用send方法所在线程的不同,采取不同的策略,如果调用TcpConnection::send的线程刚好是IO线程,则立刻使用write将数据送出(当然是缓冲区为空的时候)。如果调用TcpConnection::send的线程是work线程(也就是后台处理线程)则只将要发送的信息通过Task对象丢到EventLoop的异步队列中,然后立刻返回。EventLoop随后会在IO线程回调到TcpConnetion::sendInLoop方法,这样做的目的是保证网络IO相关操作只在IO线程进行。

3 TimerQueue

    改动不大,只是用Task包装了异步请求,这样保证所有关于Timer的操作都在IO线程进行。因为我们就是用timerfd来实现的定时器,而timerfd又是由epoll来监控的,所以这很好理解,对epoll监控的所有文件描述的操作都要放到IO线程。    

4 EchoServer

    在接到任务后不是立刻处理,而是将任务通过ThreadPool::addTask丢进线程池,使用多线程处理,在真正的处理回调里,简单的模拟了一个消耗CPU的函数(计算斐波那契数列),通过log可以看到,每次的任务都被分配给了池子里的不同线程。

5 EventLoop

    1 wakeup方法的实现在上一版本v0.12已经加入,但是调用被注释掉了。这次调用点位于EventLoop::queueInLoop。这个方法是用唤醒IO线程的,确切的说是唤醒IO线程里的epoll_wait,只有一点要注意,别忘记在EventLoop::handleRead里读出这个uint_64,否则会导致eventfd被持续激发使程序进入无限循环。

    2 EventLoop::queueInLoop方法,这个方法在v0.12版本叫queueLoop,为了和原始muduo保持一致,本版改名了。这个方法的作用是将一个异步回调加入到待执行队列_pendingFunctors中,与v0.12版本相比第一个差异是本版本对_pendingFunctors加了锁,这点很好理解,因为EventLoop::queueLoop经常被外部的其他非IO线程调用。第二个修改是添加了一定条件下的wakeup()唤醒。为什么单线程版本没有这个唤醒逻辑?因为单线程版本里所有的异步调用都是在Loop循环开始后,doPendingFunctors()之前触发的,只需要把回调插入_pendingFunctors这个数组即可。但是在多线程版本queueInLoop的入口就很多了,比如下面这3种情况下,都可能调用EventLoop::queueInLoop

        情况 1 IO线程,在IMuduoUser::onMessage回调里,比如EchoServer::onMessage里。

        情况 2 IO线程,在doPendingFunctors()执行Task->doTask的实现体里,比如EchoServer::onWriteComplate里。

        情况 3 非IO线程,线程池的另一个线程中。

在单线程版本里,可以不考虑情况3,情况2虽然有可能发生,但是我们当时简单假设用户只会在onMessage添加Task,而不会在Task的回调里再添加Task。所以上一个版本在此处进行了简单化处理,并不需要wakeup()操作。本版本由于要考虑这几种情况,所以添加了一些条件判断和wakeup()调用。

特别要注意_callingPendingFunctors变量。这个变量有点隐晦,我开始在写程序的时候忽略了它,后来发现它非常重要,在上面的情况2时,如果没有这个变量,会导致异步调用永远不会触发!

    3 EventLoop::runInLoop方法,本版本新添加的方法,与queueInLoop方法非常相似,"runIn"和"queueIn"从名字的差异就可以理解,当外部调用runInLoop的时候,会判断当前是否为IO线程,如果是在IO线程,则立刻执行Task里的回调。否则通过调用queueInLoop将Task加入到异步队列,等待后续被调用。

    4 EventLoop::doPendingFunctors,这个方法与queueInLoop方法一样,也是两处修改,首先是由于多线程操作vector必须要加锁,另外是添加了_callingPendingFunctors变量的控制,再次强调这个变量非常重要。


本篇的最后,为了更清晰的解释EventLoop在多线程环境下的逻辑,我画了一张时序图,时序图表达的就是“在非IO线程里调用TcpConnection::send发送数据”这一动作引发的连锁调用。这一动作需要3个Loop来完成,涉及4个子调用过程。绿色表明代码工作在IO线程红色表示代码工作在Work线程(工作线程,真正处理计算任务的线程),在原书中多线程EventLoop的讲解位于294页附近,但是很遗憾,作者没有为这一过程制作时序图。我这里算是补画一张吧。