首页 > 代码库 > muduo网络库学习笔记(9):Reactor模式的关键结构
muduo网络库学习笔记(9):Reactor模式的关键结构
Reactor模式简介
Reactor的意思是“反应堆”,是一种事件驱动机制。它和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。
moduo库Reactor模式的实现
muduo中Reactor的关键结构包括:EventLoop、Poller和Channel。
类图如下:
如类图所示,EventLoop类和Poller类属于组合的关系,EventLoop类和Channel类属于聚合的关系…
我们这里补充一下这两种类与类之间的关系:
I.聚合关系
聚合是关联关系的一种特例,它体现的是整体与部分的关系,即has-a的关系。此时整体与部分之间是可分离的,它们可以具有各自的生命周期,部分可以属于多个整体对象,也可以为多个整体对象共享。比如计算机与CPU、公司与员工的关系等。在UML类图设计中,聚合关系以空心菱形表示。
例:
II.组合关系
组合也是关联关系的一种特例,它体现的是一种contains-a的关系,这种关系比聚合更强,也称为强聚合。它同样体现整体与部分间的关系,但此时整体与部分是不可分的,整体的生命周期结束也就意味着部分的生命周期结束,比如人和人的大脑。在UML类图设计中,组合关系以实心菱形表示。
例:
我们下面根据事件的循环流程来理解muduo对Reactor模式的实现。
时序图:
EventLoop::loop()调用Poller::poll()获得当前活动事件的Channel列表,再遍历该列表,执行每个Channel的Channel::handleEvent()完成相应就绪事件回调。
代码片段1:EventLoop::loop()
文件名:EventLoop.cc
// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
assert(!looping_);
// 断言当前处于创建该对象的线程中
assertInLoopThread();
looping_ = true;
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear(); //清空当前vector中的所有元素
//调用Poller::poll()返回活动的通道
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
//调用Channel::handleEvent()完成相应的事件回调
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
}
Poller class是IO多路复用的封装,在muduo中它是一个抽象基类,因为muduo同时支持poll和epoll两种IO多路复用机制。
代码片段2:PollPoller::poll()
文件名:PollPoller.cc
//PollPoller::poll()是PollPoller的核心功能
//它调用poll()获得当前活动的IO事件
//然后填充调用方传入的activeChannels
//并返回poll return的时刻
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
/**
* poll函数原型:
* int poll(struct pollfd *fds, unsigned long nfds, int timeout);
* 返回值:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
*
* 这里直接把vector<struct pollfd>pollfds_传给poll
* &*pollfds_.begin()是获得元素的首地址,表达式类型符合函数要求
*/
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happended";
//fillActiveChannels()会遍历pollfds_
//找出有活动事件的fd
//把它对应的Channel填入activeChannels
fillActiveChannels(numEvents, activeChannels);
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happended";
}
else
{
LOG_SYSERR << "PollPoller::poll()";
}
return now;
}
代码片段3:PollPoller::fillActiveChannels()
文件名:PollPoller.cc
void PollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
//每找到一个活动fd就递减numEvents
--numEvents;
//ChannelMap是从fd到Channel*的映射
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
//将当前活动事件revents保存到Channel中
//供Channel::handleEvent()使用
channel->set_revents(pfd->revents);
activeChannels->push_back(channel);
}
}
}
Channel::handleEvent()调用Channel::handleEventWithGuard()处理事件。
代码片段4:Channel::handleEventWithGuard()
文件名:Channel.cc
//根据revents_的值分别调用不同的用户回调
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
//POLLHUP:发生挂起
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
//POLLNVAL:描述符不是一个打开的文件
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
//POLLERR:发生错误
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
//POLLIN:普通或优先级带数据可读
//POLLPRI:高优先级数据可读
//POLLRDHUP:优先级带数据可读
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
//POLLOUT:普通数据可写
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
下面我们再来看一下注册和更新IO事件的流程。
时序图:
enableReading()函数会加入可读事件,然后执行Channel的Update()函数,Channel::Update()会调用EventLoop::updateChannel(),后者会转而调用Poller::updateChannel()。
代码片段5:PollPoller::updateChannel()
文件名:PollPoller.cc
void PollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
//index()返回在poll的事件数组中的序号,index_在构造函数中的初始值为-1
// index < 0说明是一个新的通道
if (channel->index() < 0)
{
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
//加入到容器的最后一个位置,并设置它的序号
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
}
//是已有的通道
else
{
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
// 将通道暂时更改为不关注事件,但不从Poller中移除该通道
if (channel->isNoneEvent())
{
// 如果某个Channel暂时不关心任何事件
// 就把pfd.fd直接设置为-1,让poll忽略此项
// 这里的设置是为了removeChannel优化
pfd.fd = -channel->fd()-1;
}
}
}
代码片段6:PollPoller::removeChannel()
文件名:PollPoller.cc
void PollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
// removeChannel前需要先updateChannel为不关注事件
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd()); //移除该元素
assert(n == 1); (void)n;
//如果恰好是最后一个元素,直接删除
if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
{
pollfds_.pop_back();
}
else
{
//将待删除元素与最后一个元素交换再pop_back,算法复杂度是O(1)
int channelAtEnd = pollfds_.back().fd;
iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
if (channelAtEnd < 0)
{
channelAtEnd = -channelAtEnd-1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back();
}
}
muduo网络库学习笔记(9):Reactor模式的关键结构