首页 > 代码库 > muduo网络库学习笔记(9):Reactor模式的关键结构

muduo网络库学习笔记(9):Reactor模式的关键结构

Reactor模式简介

Reactor的意思是“反应堆”,是一种事件驱动机制。它和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

moduo库Reactor模式的实现

muduo中Reactor的关键结构包括:EventLoop、Poller和Channel。

类图如下:
技术分享

如类图所示,EventLoop类和Poller类属于组合的关系,EventLoop类和Channel类属于聚合的关系…
我们这里补充一下这两种类与类之间的关系:
I.聚合关系
聚合是关联关系的一种特例,它体现的是整体与部分的关系,即has-a的关系。此时整体与部分之间是可分离的,它们可以具有各自的生命周期,部分可以属于多个整体对象,也可以为多个整体对象共享。比如计算机与CPU、公司与员工的关系等。在UML类图设计中,聚合关系以空心菱形表示。
例:技术分享

II.组合关系
组合也是关联关系的一种特例,它体现的是一种contains-a的关系,这种关系比聚合更强,也称为强聚合。它同样体现整体与部分间的关系,但此时整体与部分是不可分的,整体的生命周期结束也就意味着部分的生命周期结束,比如人和人的大脑。在UML类图设计中,组合关系以实心菱形表示。
例: 技术分享

我们下面根据事件的循环流程来理解muduo对Reactor模式的实现。
时序图:
技术分享

EventLoop::loop()调用Poller::poll()获得当前活动事件的Channel列表,再遍历该列表,执行每个Channel的Channel::handleEvent()完成相应就绪事件回调。

代码片段1:EventLoop::loop()
文件名:EventLoop.cc

// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
  assert(!looping_);
  // 断言当前处于创建该对象的线程中
  assertInLoopThread();
  looping_ = true;
  LOG_TRACE << "EventLoop " << this << " start looping";

  while (!quit_)
  {
    activeChannels_.clear(); //清空当前vector中的所有元素
    //调用Poller::poll()返回活动的通道
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;
      //调用Channel::handleEvent()完成相应的事件回调
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
  }

Poller class是IO多路复用的封装,在muduo中它是一个抽象基类,因为muduo同时支持poll和epoll两种IO多路复用机制。

代码片段2:PollPoller::poll()
文件名:PollPoller.cc

//PollPoller::poll()是PollPoller的核心功能
//它调用poll()获得当前活动的IO事件
//然后填充调用方传入的activeChannels
//并返回poll return的时刻
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  /**
   * poll函数原型: 
   * int poll(struct pollfd *fds, unsigned long nfds, int timeout);
   * 返回值:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
   * 
   * 这里直接把vector<struct pollfd>pollfds_传给poll
   * &*pollfds_.begin()是获得元素的首地址,表达式类型符合函数要求
   */
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happended";
    //fillActiveChannels()会遍历pollfds_
    //找出有活动事件的fd
    //把它对应的Channel填入activeChannels
    fillActiveChannels(numEvents, activeChannels);
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happended";
  }
  else
  {
    LOG_SYSERR << "PollPoller::poll()";
  }
  return now;
}
代码片段3:PollPoller::fillActiveChannels()
文件名:PollPoller.cc

void PollPoller::fillActiveChannels(int numEvents,
                                    ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      //每找到一个活动fd就递减numEvents
      --numEvents;
      //ChannelMap是从fd到Channel*的映射
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      assert(ch != channels_.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      //将当前活动事件revents保存到Channel中
      //供Channel::handleEvent()使用
      channel->set_revents(pfd->revents);
      activeChannels->push_back(channel);
    }
  }
}

Channel::handleEvent()调用Channel::handleEventWithGuard()处理事件。

代码片段4:Channel::handleEventWithGuard()
文件名:Channel.cc

//根据revents_的值分别调用不同的用户回调
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  eventHandling_ = true;
  //POLLHUP:发生挂起
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }

  //POLLNVAL:描述符不是一个打开的文件
  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  }

  //POLLERR:发生错误
  if (revents_ & (POLLERR | POLLNVAL))
  {
    if (errorCallback_) errorCallback_();
  }
  //POLLIN:普通或优先级带数据可读
  //POLLPRI:高优先级数据可读
  //POLLRDHUP:优先级带数据可读
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  //POLLOUT:普通数据可写
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }
  eventHandling_ = false;
}

下面我们再来看一下注册和更新IO事件的流程。
时序图:
技术分享
enableReading()函数会加入可读事件,然后执行Channel的Update()函数,Channel::Update()会调用EventLoop::updateChannel(),后者会转而调用Poller::updateChannel()。

代码片段5:PollPoller::updateChannel()
文件名:PollPoller.cc

void PollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  //index()返回在poll的事件数组中的序号,index_在构造函数中的初始值为-1
  // index < 0说明是一个新的通道
  if (channel->index() < 0)
  {
    assert(channels_.find(channel->fd()) == channels_.end());
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    pollfds_.push_back(pfd);
    //加入到容器的最后一个位置,并设置它的序号
    int idx = static_cast<int>(pollfds_.size())-1;
    channel->set_index(idx);
    channels_[pfd.fd] = channel;
  }
  //是已有的通道
  else
  {
    assert(channels_.find(channel->fd()) != channels_.end());
    assert(channels_[channel->fd()] == channel);
    int idx = channel->index();
    assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
    struct pollfd& pfd = pollfds_[idx];
    assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    // 将通道暂时更改为不关注事件,但不从Poller中移除该通道
    if (channel->isNoneEvent())
    {
      // 如果某个Channel暂时不关心任何事件
      // 就把pfd.fd直接设置为-1,让poll忽略此项
      // 这里的设置是为了removeChannel优化
      pfd.fd = -channel->fd()-1; 
    }
  }
}
代码片段6:PollPoller::removeChannel()
文件名:PollPoller.cc

void PollPoller::removeChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd();
  assert(channels_.find(channel->fd()) != channels_.end());
  assert(channels_[channel->fd()] == channel);
  // removeChannel前需要先updateChannel为不关注事件
  assert(channel->isNoneEvent());
  int idx = channel->index();
  assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
  const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
  assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
  size_t n = channels_.erase(channel->fd()); //移除该元素
  assert(n == 1); (void)n;
  //如果恰好是最后一个元素,直接删除
  if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
  {
    pollfds_.pop_back();
  }
  else
  {
    //将待删除元素与最后一个元素交换再pop_back,算法复杂度是O(1)
    int channelAtEnd = pollfds_.back().fd;
    iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
    if (channelAtEnd < 0)
    {
      channelAtEnd = -channelAtEnd-1;
    }
    channels_[channelAtEnd]->set_index(idx);
    pollfds_.pop_back();
  }
}
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    muduo网络库学习笔记(9):Reactor模式的关键结构