首页 > 代码库 > zmq笔记二: io线程和poller_t

zmq笔记二: io线程和poller_t

int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0

本文主要是分析代码,方便自己日后查阅.

=========================================

在上一篇中讲到io_thread_t的线程循环函数实际上调用的,是根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t)的成员函数loop().

怎样确定选用哪种I/O多路复用,由一些预编译宏确定,请看poller.hpp头文件.

本文是在windows平台下进行分析. windows下选用的是select_t,并不是iocp.

 

1. I/O线程

io_thread_t有三个成员变量:

        //  I/O thread accesses incoming commands via this mailbox.
        mailbox_t mailbox; //接收命令消息的邮箱,mailbox相关资料会在后面的文章展开介绍. 当需要和io_thread_t通信时,给它的邮箱发一个command_t命令

        //  Handle associated with mailbox‘ file descriptor.
        poller_t::handle_t mailbox_handle; //与邮箱绑定的句柄

        //  I/O multiplexing is performed using a poller object.
        poller_t *poller; //选用的i/o多路复用

io_thread_t这个类的功能很简洁,主要操作有: 线程开启,线程结束,处理在mailbox里的命令队列的消息(in_events函数).而mailbox里有消息待处理,是通过mailbox的fd状态可读来进行通知的,这个fd就是io_thread_t的成员变量mailbox_handle.

zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
    object_t (ctx_, tid_)
{
    poller = new (std::nothrow) poller_t (*ctx_);
    alloc_assert (poller);

    mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (mailbox_handle);//初始化时加进去poller_t的可读fd集合了.
}

 

2. poller_t

poller_t实际上是一个typedef的类型:

typedef select_t poller_t;

typedef epoll_t poller_t;

...

它根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t).本文只分析select_t.

就select_t而言,在windows和linux平台下,套接字集合的管理有所不同,但原理也差不多. 在select_t里有两个平台无关的通用的结构体:

        //  Internal state.
        struct fds_set_t //select_t对感兴趣的fd的事件集合管理
        {
            fds_set_t ();
            fds_set_t (const fds_set_t& other_);
            fds_set_t& operator=(const fds_set_t& other_);
            //  Convinient method to descriptor from all sets.
            void remove_fd (const fd_t& fd_);

            fd_set read;
            fd_set write;
            fd_set error;
        };

        struct fd_entry_t //与fd对应的事件处理对象(可理解成实现了i_poll_events相关接口并需要在io线程处理事件的对象)
        {
            fd_t fd;
            zmq::i_poll_events* events;
        };
        typedef std::vector<fd_entry_t> fd_entries_t;
#if defined ZMQ_HAVE_WINDOWS
        ......
#else
        fd_entries_t fd_entries; //fd对应的事件对象集合
        fds_set_t fds_set; //select系统调用需要的所有感兴趣的fd集合
        fd_t maxfd; //select系统调用需要的最大fd值
        bool retired; //是否需要移除fd对应的事件对象,如果为true,则从fd_entries删除fd所对应的事件对象
#endif

poller_t还有一个thread_t成员变量worker,它才是系统线程的包装 (io_thread_t.poller->worker).worker线程开启后,实际执行的就是poller_t:loop()函数.

        //  Handle of the physical thread doing the I/O work.
        thread_t worker;

poller_t对某些事件对象(实现了i_poll_events:in_events接口)感兴趣,就以fd为key,加进去集合fd_entries_t.同时把fd加到fds_set.error集合里,监听fd的错误事件.

zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{
    fd_entry_t fd_entry;
    fd_entry.fd = fd_;
    fd_entry.events = events_;

#if defined ZMQ_HAVE_WINDOWS
   ......
#else
    fd_entries.push_back (fd_entry);
    FD_SET (fd_, &fds_set.error);

    if (fd_ > maxfd)
        maxfd = fd_;
#endif

    adjust_load (1); //fd数量调整,原子增减操作

    return fd_;
}

需要注意的是,add_fd并没有把fd放进fds_set.read和fds_set.write,也就是说add_fd加进去的fd并不能被select监听到读写事件.

但是删除fd时,会把fd相关的信息都从poller_t里移除掉.

void zmq::select_t::rm_fd (handle_t handle_)
{
#if defined ZMQ_HAVE_WINDOWS
    ......
#else
    fd_entries_t::iterator fd_entry_it;
    for (fd_entry_it = fd_entries.begin ();
          fd_entry_it != fd_entries.end (); ++fd_entry_it)
        if (fd_entry_it->fd == handle_) //遍历集合找到目标元素
            break;
    zmq_assert (fd_entry_it != fd_entries.end ());

    fd_entry_it->fd = retired_fd; //标记设置为移除,注意找到目标元素后并没有立即从vector里remove掉,而是标记retired为true,在select系统调用完成后统一移除.
    fds_set.remove_fd (handle_); //从select集合里去掉

    if (handle_ == maxfd) { //更新最大的fd值
        maxfd = retired_fd;
        for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end ();
              ++fd_entry_it)
            if (fd_entry_it->fd > maxfd)
                maxfd = fd_entry_it->fd;
    }

    retired = true; //标记为需要删除
#endif
    adjust_load (-1); //fd数量调整
}

poller_t对fd的读写监听是通过这几个函数来操作的:

        void set_pollin (handle_t handle_); //监听fd可读状态
        void reset_pollin (handle_t handle_); //移除fd监听可读
        void set_pollout (handle_t handle_); //监听fd可写状态
        void reset_pollout (handle_t handle_); //移除fd监听可写

poller_t继承自poller_base_t,含有定时器集合:

        //  Clock instance private to this I/O thread.
        clock_t clock;

        //  List of active timers.
        struct timer_info_t
        {
            zmq::i_poll_events *sink;
            int id;
        };
        typedef std::multimap <uint64_t, timer_info_t> timers_t;
        timers_t timers;

定时器集合timers_t是用std:multimap容器,能保证timer的重复键值并有序.处理timer事件也很简洁,从最小时间值的元素开始与当前时间戳比较一下,大于当前时间就是timer时间到来,此时执行timer事件处理,处理完后从定时器集合移除.

uint64_t zmq::poller_base_t::execute_timers ()
{
    //  Fast track.
    if (timers.empty ())
        return 0;

    //  Get the current time.
    uint64_t current = clock.now_ms ();

    //   Execute the timers that are already due.
    timers_t::iterator it = timers.begin ();
    while (it != timers.end ()) {
        if (it->first > current)
            return it->first - current;

        //  Trigger the timer.
        it->second.sink->timer_event (it->second.id);

        //  Remove it from the list of active timers.
        timers_t::iterator o = it;
        ++it;
        timers.erase (o);
    }

    //  There are no more timers.
    return 0;
}

 

3. I/O线程的循环函数

循环里做了三件事情:

 1.执行已注册的定时器

 2.对fds_set的read/write/error的fd集合进行select,并处理各个fd发生的事件.

 3.从事件集合fd_entries里移除已经标记为retired_fd的事件对象.

void zmq::select_t::loop ()
{
    while (!stopping) {
        //  Execute any due timers.
        int timeout = (int) execute_timers ();

        int rc = 0;

#if defined ZMQ_HAVE_WINDOWS
       ......
#else
        fds_set_t local_fds_set = fds_set;
        rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write,
            &local_fds_set.error, timeout ? &tv : NULL);

        if (rc == -1) {
            errno_assert (errno == EINTR);
            continue;
        }

        //  Size is cached to avoid iteration through just added descriptors.
        for (fd_entries_t::size_type i = 0, size = fd_entries.size (); i < size && rc > 0; ++i) {
            fd_entry_t& fd_entry = fd_entries [i];
            ......
            if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) {
                fd_entry.events->in_event ();
                --rc;
            }
       ......
            if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) {
                fd_entry.events->out_event ();
                --rc;
            }
       ...... if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) { fd_entry.events->in_event (); --rc; } } if (retired) { //等待select返回并处理完fd的事件后,再统一从fd_entries集合里移除标记为retired_fd的元素 retired = false; fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (), is_retired_fd), fd_entries.end ()); } #endif } }

  

zmq笔记二: io线程和poller_t