首页 > 代码库 > libevent(了解)

libevent(了解)

1 前言

Libevent是一个轻量级的开源高性能网络库,使用者众多,研究者更甚,相关文章也不少。写这一系列文章的用意在于,一则分享心得;二则对libevent代码和设计思想做系统的、更深层次的分析,写出来,也可供后来者参考。

附带一句:Libevent是用c语言编写的(MS大牛们都偏爱c语言哪),而且几乎是无处不函数指针,学习其源代码也需要相当的c语言基础。

2 LIBEVENT简介

上来当然要先夸奖啦,Libevent 有几个显著的亮点:
事件驱动(event-driven),高性能;
轻量级,专注于网络,不如ACE那么臃肿庞大;
源代码相当精炼、易读;
跨平台,支持Windows、Linux、*BSD和Mac Os;
支持多种I/O多路复用技术, epoll、poll、dev/poll、select和kqueue等;
支持I/O,定时器和信号等事件;
注册事件优先级;

Libevent已经被广泛的应用,作为底层的网络库;比如memcached、Vomit、Nylon、Netchat等等。
Libevent当前的最新稳定版是1.4.13;这也是本文参照的版本。

3 学习的好处

学习libevent有助于提升程序设计功力,除了网络程序设计方面外,Libevent的代码里有很多有用的设计技巧和基础数据结构,比如信息隐藏、函数指针、c语言的多态支持、链表和堆等等,都有助于提升自身的程序功力。
程序设计不止要了解框架,很多细节之处恰恰也是事关整个系统成败的关键。只对libevent本身的框架大概了解,那或许仅仅是一知半解,不深入代码分析,就难以了解其设计的精巧之处,也就难以为自己所用。

事实上Libevent本身就是一个典型的Reactor模型,理解Reactor模式是理解libevent的基石;因此下一节将介绍典型的事件驱动设计模式——Reactor模式。

前面讲到,整个libevent本身就是一个Reactor,因此本节将专门对Reactor模式进行必要的介绍,并列出libevnet中的几个重要组件和Reactor的对应关系,在后面的章节中可能还会提到本节介绍的基本概念。

1 REACTOR的事件处理机制

首先来回想一下普通函数调用的机制:程序调用某函数?函数执行,程序等待?函数将结果和控制权返回给程序?程序继续处理。
Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。使用Libevent也是想Libevent框架注册相应的事件和回调函数;当这些时间发声时,Libevent会调用这些回调函数处理相应的事件(I/O读写、定时和信号)。
用“好莱坞原则”来形容Reactor再合适不过了:不要打电话给我们,我们会打电话通知你。
举个例子:你去应聘某xx公司,面试结束后。
“普通函数调用机制”公司HR比较懒,不会记你的联系方式,那怎么办呢,你只能面试完后自己打电话去问结果;有没有被录取啊,还是被据了;

“Reactor”公司HR就记下了你的联系方式,结果出来后会主动打电话通知你:有没有被录取啊,还是被据了;你不用自己打电话去问结果,事实上也不能,你没有HR的留联系方式。

2 REACTOR模式的优点

Reactor模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:
1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

3 REACTOR模式框架

使用Reactor模型,必备的几个组件:事件源、Reactor框架、多路复用机制和事件处理程序,先来看看Reactor模型的整体框架,接下来再对每个组件做逐一说明。
components

1) 事件源
Linux上是文件描述符,Windows上就是Socket或者Handle了,这里统一称为“句柄集”;程序在指定的句柄上注册关心的事件,比如I/O事件。

2) event demultiplexer——事件多路分发机制
由操作系统提供的I/O多路复用机制,比如select和epoll。
程序首先将其关心的句柄(事件源)及其事件注册到event demultiplexer上;
当有事件到达时,event demultiplexer会发出通知“在已经注册的句柄集中,一个或多个句柄的事件已经就绪”;
程序收到通知后,就可以在非阻塞的情况下对事件进行处理了。
对应到libevent中,依然是select、poll、epoll等,但是libevent使用结构体eventop进行了封装,以统一的接口来支持这些I/O多路复用机制,达到了对外隐藏底层系统机制的目的。

3) Reactor——反应器
Reactor,是事件管理的接口,内部使用event demultiplexer注册、注销事件;并运行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数处理事件。
对应到libevent中,就是event_base结构体。
一个典型的Reactor声明方式

view plain
  1. class Reactor
  2. {
  3. public:
  4.     int register_handler(Event_Handler *pHandler, int event);
  5.     int remove_handler(Event_Handler *pHandler, int event);
  6.     void handle_events(timeval *ptv);
  7.     // …
  8. };

4) Event Handler——事件处理程序
事件处理程序提供了一组接口,每个接口对应了一种类型的事件,供Reactor在相应的事件发生时调用,执行相应的事件处理。通常它会绑定一个有效的句柄。
对应到libevent中,就是event结构体。
下面是两种典型的Event Handler类声明方式,二者互有优缺点。

view plain
  1. class Event_Handler
  2. {
  3. public:
  4.     virtual void handle_read() = 0;
  5.     virtual void handle_write() = 0;
  6.     virtual void handle_timeout() = 0;
  7.     virtual void handle_close() = 0;
  8.     virtual HANDLE get_handle() = 0;
  9.     // …
  10. };
  11. class Event_Handler
  12. {
  13. public:
  14.     // events maybe read/write/timeout/close .etc
  15.     virtual void handle_events(int events) = 0;
  16.     virtual HANDLE get_handle() = 0;
  17.     // …
  18. };

 

4 REACTOR事件处理流程

前面说过Reactor将事件流“逆置”了,那么使用Reactor模式后,事件控制流是什么样子呢?
可以参见下面的序列图。
reactor sequences

5 小结

上面讲到了Reactor的基本概念、框架和处理流程,对Reactor有个基本清晰的了解后,再来对比看libevent就会更容易理解了,接下来就正式进入到libevent的代码世界了,加油!

1 前言

学习源代码该从哪里入手?我觉得从程序的基本使用场景和代码的整体处理流程入手是个不错的方法,至少从个人的经验上讲,用此方法分析libevent是比较有效的。

2 基本应用场景

基本应用场景也是使用libevnet的基本流程,下面来考虑一个最简单的场景,使用livevent设置定时器,应用程序只需要执行下面几个简单的步骤即可。
1)首先初始化libevent库,并保存返回的指针
struct event_base * base = event_init();
实际上这一步相当于初始化一个Reactor实例;在初始化libevent后,就可以注册事件了。

2)初始化事件event,设置回调函数和关注的事件
evtimer_set(&ev, timer_cb, NULL);
事实上这等价于调用event_set(&ev, -1, 0, timer_cb, NULL);
event_set的函数原型是:
void event_set(struct event *ev, int fd, short event, void (*cb)(int, short, void *), void *arg)
ev:执行要初始化的event对象;
fd:该event绑定的“句柄”,对于信号事件,它就是关注的信号;
event:在该fd上关注的事件类型,它可以是EV_READ, EV_WRITE, EV_SIGNAL;
cb:这是一个函数指针,当fd上的事件event发生时,调用该函数执行处理,它有三个参数,调用时由event_base负责传入,按顺序,实际上就是event_set时的fd, event和arg;
arg:传递给cb函数指针的参数;
由于定时事件不需要fd,并且定时事件是根据添加时(event_add)的超时值设定的,因此这里event也不需要设置。
这一步相当于初始化一个event handler,在libevent中事件类型保存在event结构体中。
注意:libevent并不会管理event事件集合,这需要应用程序自行管理;

3)设置event从属的event_base
event_base_set(base, &ev);
这一步相当于指明event要注册到哪个event_base实例上;

4)是正式的添加事件的时候了
event_add(&ev, timeout);
基本信息都已设置完成,只要简单的调用event_add()函数即可完成,其中timeout是定时值;
这一步相当于调用Reactor::register_handler()函数注册事件。

5)程序进入无限循环,等待就绪事件并执行事件处理
event_base_dispatch(base);

3 实例代码

上面例子的程序代码如下所示

view plain
  1. struct event ev;
  2. struct timeval tv;
  3. void time_cb(int fd, short event, void *argc)
  4. {
  5.     printf(“timer wakeup/n”);
  6.     event_add(&ev, &tv); // reschedule timer
  7. }
  8. int main()
  9. {
  10.     struct event_base *base = event_init();
  11.     tv.tv_sec = 10; // 10s period
  12.     tv.tv_usec = 0;
  13.     evtimer_set(&ev, time_cb, NULL);
  14.     event_add(&ev, &tv);
  15.     event_base_dispatch(base);
  16. }

 

4 事件处理流程

当应用程序向libevent注册一个事件后,libevent内部是怎么样进行处理的呢?下面的图就给出了这一基本流程。
1)    首先应用程序准备并初始化event,设置好事件类型和回调函数;这对应于前面第步骤2和3;
2)    向libevent添加该事件event。对于定时事件,libevent使用一个小根堆管理,key为超时时间;对于Signal和I/O事件,libevent将其放入到等待链表(wait list)中,这是一个双向链表结构;
3)    程序调用event_base_dispatch()系列函数进入无限循环,等待事件,以select()函数为例;每次循环前libevent会检查定时事件的最小超时时间tv,根据tv设置select()的最大等待时间,以便于后面及时处理超时事件;
当select()返回后,首先检查超时事件,然后检查I/O事件;
Libevent将所有的就绪事件,放入到激活链表中;
然后对激活链表中的事件,调用事件的回调函数执行事件处理;
event flow

5 小结

本节介绍了libevent的简单实用场景,并旋风般的介绍了libevent的事件处理流程,读者应该对libevent有了基本的印象,下面将会详细介绍libevent的事件管理框架(Reactor模式中的Reactor框架)做详细的介绍,在此之前会对源代码文件做简单的分类。

——libevent源代码文件组织

1 前言

详细分析源代码之前,如果能对其代码文件的基本结构有个大概的认识和分类,对于代码的分析将是大有裨益的。本节内容不多,我想并不是说它不重要!

2 源代码组织结构

Libevent的源代码虽然都在一层文件夹下面,但是其代码分类还是相当清晰的,主要可分为头文件、内部使用的头文件、辅助功能函数、日志、libevent框架、对系统I/O多路复用机制的封装、信号管理、定时事件管理、缓冲区管理、基本数据结构和基于libevent的两个实用库等几个部分,有些部分可能就是一个源文件。
源代码中的test部分就不在我们关注的范畴了。
1)头文件
主要就是event.h:事件宏定义、接口函数声明,主要结构体event的声明;
2)内部头文件
xxx-internal.h:内部数据结构和函数,对外不可见,以达到信息隐藏的目的;
3)libevent框架
event.c:event整体框架的代码实现;
4)对系统I/O多路复用机制的封装
epoll.c:对epoll的封装;
select.c:对select的封装;
devpoll.c:对dev/poll的封装;
kqueue.c:对kqueue的封装;
5)定时事件管理
min-heap.h:其实就是一个以时间作为key的小根堆结构;
6)信号管理
signal.c:对信号事件的处理;
7)辅助功能函数
evutil.h 和evutil.c:一些辅助功能函数,包括创建socket pair和一些时间操作函数:加、减和比较等。
8)日志
log.h和log.c:log日志函数
9)缓冲区管理
evbuffer.c和buffer.c:libevent对缓冲区的封装;
10)基本数据结构
compat/sys下的两个源文件:queue.h是libevent基本数据结构的实现,包括链表,双向链表,队列等;_libevent_time.h:一些用于时间操作的结构体定义、函数和宏定义;
11)实用网络库
http和evdns:是基于libevent实现的http服务器和异步dns查询库;

3 小结

本节介绍了libevent的组织和分类,下面将会详细介绍libevent的核心部分event结构。

对事件处理流程有了高层的认识后,本节将详细介绍libevent的核心结构event,以及libevent对event的管理。

1 LIBEVENT的核心-EVENT

Libevent是基于事件驱动(event-driven)的,从名字也可以看到event是整个库的核心。event就是Reactor框架中的事件处理程序组件;它提供了函数接口,供Reactor在事件发生时调用,以执行相应的事件处理,通常它会绑定一个有效的句柄。
首先给出event结构体的声明,它位于event.h文件中:

view plain
  1. struct event {
  2.  TAILQ_ENTRY (event) ev_next;
  3.  TAILQ_ENTRY (event) ev_active_next;
  4.  TAILQ_ENTRY (event) ev_signal_next;
  5.  unsigned int min_heap_idx; /* for managing timeouts */
  6.  struct event_base *ev_base;
  7.  int ev_fd;
  8.  short ev_events;
  9.  short ev_ncalls;
  10.  short *ev_pncalls; /* Allows deletes in callback */
  11.  struct timeval ev_timeout;
  12.  int ev_pri;  /* smaller numbers are higher priority */
  13.  void (*ev_callback)(int, short, void *arg);
  14.  void *ev_arg;
  15.  int ev_res;  /* result passed to event callback */
  16.  int ev_flags;
  17. };

下面简单解释一下结构体中各字段的含义。
1)ev_events:event关注的事件类型,它可以是以下3种类型:
I/O事件: EV_WRITE和EV_READ
定时事件:EV_TIMEOUT
信号:    EV_SIGNAL
辅助选项:EV_PERSIST,表明是一个永久事件
Libevent中的定义为:

view plain
  1. #define EV_TIMEOUT 0×01
  2. #define EV_READ  0×02
  3. #define EV_WRITE 0×04
  4. #define EV_SIGNAL 0×08
  5. #define EV_PERSIST 0×10 /* Persistant event */

可以看出事件类型可以使用“|”运算符进行组合,需要说明的是,信号和I/O事件不能同时设置;
还可以看出libevent使用event结构体将这3种事件的处理统一起来;
2)ev_next,ev_active_next和ev_signal_next都是双向链表节点指针;它们是libevent对不同事件类型和在不同的时期,对事件的管理时使用到的字段。
libevent使用双向链表保存所有注册的I/O和Signal事件,ev_next就是该I/O事件在链表中的位置;称此链表为“已注册事件链表”;
同样ev_signal_next就是signal事件在signal事件链表中的位置;
ev_active_next:libevent将所有的激活事件放入到链表active list中,然后遍历active list执行调度,ev_active_next就指明了event在active list中的位置;
2)min_heap_idx和ev_timeout,如果是timeout事件,它们是event在小根堆中的索引和超时值,libevent使用小根堆来管理定时事件,这将在后面定时事件处理时专门讲解
3)ev_base该事件所属的反应堆实例,这是一个event_base结构体,下一节将会详细讲解;
4)ev_fd,对于I/O事件,是绑定的文件描述符;对于signal事件,是绑定的信号;
5)ev_callback,event的回调函数,被ev_base调用,执行事件处理程序,这是一个函数指针,原型为:
void (*ev_callback)(int fd, short events, void *arg)
其中参数fd对应于ev_fd;events对应于ev_events;arg对应于ev_arg;
6)ev_arg:void*,表明可以是任意类型的数据,在设置event时指定;
7)eb_flags:libevent用于标记event信息的字段,表明其当前的状态,可能的值有:

view plain
  1. #define EVLIST_TIMEOUT 0×01 // event在time堆中
  2. #define EVLIST_INSERTED 0×02 // event在已注册事件链表中
  3. #define EVLIST_SIGNAL 0×04 // 未见使用
  4. #define EVLIST_ACTIVE 0×08 // event在激活链表中
  5. #define EVLIST_INTERNAL 0×10 // 内部使用标记
  6. #define EVLIST_INIT     0×80 // event已被初始化

8)ev_ncalls:事件就绪执行时,调用ev_callback的次数,通常为1;
9)ev_pncalls:指针,通常指向ev_ncalls或者为NULL;
10)ev_res:记录了当前激活事件的类型;

 

 

2 LIBEVENT对EVENT的管理

从event结构体中的3个链表节点指针和一个堆索引出发,大体上也能窥出libevent对event的管理方法了,可以参见下面的示意图:
event management

每次当有事件event转变为就绪状态时,libevent就会把它移入到active event list[priority]中,其中priority是event的优先级;
接着libevent会根据自己的调度策略选择就绪事件,调用其cb_callback()函数执行事件处理;并根据就绪的句柄和事件类型填充cb_callback函数的参数。

3 事件设置的接口函数

要向libevent添加一个事件,需要首先设置event对象,这通过调用libevent提供的函数有:event_set(), event_base_set(), event_priority_set()来完成;下面分别进行讲解。

void event_set(struct event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg)
1.设置事件ev绑定的文件描述符或者信号,对于定时事件,设为-1即可;
2.设置事件类型,比如EV_READ|EV_PERSIST, EV_WRITE, EV_SIGNAL等;
3.设置事件的回调函数以及参数arg;
4.初始化其它字段,比如缺省的event_base和优先级;
int event_base_set(struct event_base *base, struct event *ev)
设置event ev将要注册到的event_base;
libevent有一个全局event_base指针current_base,默认情况下事件ev将被注册到current_base上,使用该函数可以指定不同的event_base;
如果一个进程中存在多个libevent实例,则必须要调用该函数为event设置不同的event_base;

int event_priority_set(struct event *ev, int pri)
设置event ev的优先级,没什么可说的,注意的一点就是:当ev正处于就绪状态时,不能设置,返回-1。

4 小结

本节讲述了libevent的核心event结构,以及libevent支持的事件类型和libevent对event的管理模型;接下来将会描述libevent的事件处理框架,以及其中使用的重要的结构体event_base;

前面已经对libevent的事件处理框架和event结构体做了描述,现在是时候剖析libevent对事件的详细处理流程了,本节将分析libevent的事件处理框架event_base和libevent注册、删除事件的具体流程,可结合前一节libevent对event的管理。

1 事件处理框架-EVENT_BASE

回想Reactor模式的几个基本组件,本节讲解的部分对应于Reactor框架组件。在libevent中,这就表现为event_base结构体,结构体声明如下,它位于event-internal.h文件中:

view plain
  1. struct event_base {
  2.  const struct eventop *evsel;
  3.  void *evbase;
  4.  int event_count;  /* counts number of total events */
  5.  int event_count_active; /* counts number of active events */
  6.  int event_gotterm;  /* Set to terminate loop */
  7.  int event_break;  /* Set to terminate loop immediately */
  8.  /* active event management */
  9.  struct event_list **activequeues;
  10.  int nactivequeues;
  11.  /* signal handling info */
  12.  struct evsignal_info sig;
  13.  struct event_list eventqueue;
  14.  struct timeval event_tv;
  15.  struct min_heap timeheap;
  16.  struct timeval tv_cache;
  17. };

 

下面详细解释一下结构体中各字段的含义。
1)evsel和evbase这两个字段的设置可能会让人有些迷惑,这里你可以把evsel和evbase看作是类和静态函数的关系,比如添加事件时的调用行为:evsel->add(evbase, ev),实际执行操作的是evbase;这相当于class::add(instance, ev),instance就是class的一个对象实例。
evsel指向了全局变量static const struct eventop *eventops[]中的一个;
前面也说过,libevent将系统提供的I/O demultiplex机制统一封装成了eventop结构;因此eventops[]包含了select、poll、kequeue和epoll等等其中的若干个全局实例对象。
evbase实际上是一个eventop实例对象;
先来看看eventop结构体,它的成员是一系列的函数指针, 在event-internal.h文件中:
struct eventop {
const char *name;
void *(*init)(struct event_base *); // 初始化
int (*add)(void *, struct event *); // 注册事件
int (*del)(void *, struct event *); // 删除事件
int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分发
void (*dealloc)(struct event_base *, void *); // 注销,释放资源
/* set if we need to reinitialize the event base */
int need_reinit;
};
也就是说,在libevent中,每种I/O demultiplex机制的实现都必须提供这五个函数接口,来完成自身的初始化、销毁释放;对事件的注册、注销和分发。
比如对于epoll,libevent实现了5个对应的接口函数,并在初始化时并将eventop的5个函数指针指向这5个函数,那么程序就可以使用epoll作为I/O demultiplex机制了,这个在后面会再次提到。
2)activequeues是一个二级指针,前面讲过libevent支持事件优先级,因此你可以把它看作是数组,其中的元素activequeues[priority]是一个链表,链表的每个节点指向一个优先级为priority的就绪事件event。
3)eventqueue,链表,保存了所有的注册事件event的指针。
4)sig是由来管理信号的结构体,将在后面信号处理时专门讲解;
5)timeheap是管理定时事件的小根堆,将在后面定时事件处理时专门讲解;
6)event_tv和tv_cache是libevent用于时间管理的变量,将在后面讲到;
其它各个变量都能因名知意,就不再啰嗦了。

2 创建和初始化EVENT_BASE

创建一个event_base对象也既是创建了一个新的libevent实例,程序需要通过调用event_init()(内部调用event_base_new函数执行具体操作)函数来创建,该函数同时还对新生成的libevent实例进行了初始化。
该函数首先为event_base实例申请空间,然后初始化timer mini-heap,选择并初始化合适的系统I/O 的demultiplexer机制,初始化各事件链表;
函数还检测了系统的时间设置,为后面的时间管理打下基础。

3 接口函数

前面提到Reactor框架的作用就是提供事件的注册、注销接口;根据系统提供的事件多路分发机制执行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数来处理事件。
Libevent中对应的接口函数主要就是:

view plain
  1. int  event_add(struct event *ev, const struct timeval *timeout);
  2. int  event_del(struct event *ev);
  3. int  event_base_loop(struct event_base *base, int loops);
  4. void event_active(struct event *event, int res, short events);
  5. void event_process_active(struct event_base *base);

 

本节将按介绍事件注册和删除的代码流程,libevent的事件循环框架将在下一节再具体描述。
对于定时事件,这些函数将调用timer heap管理接口执行插入和删除操作;对于I/O和Signal事件将调用eventopadd和delete接口函数执行插入和删除操作(eventop会对Signal事件调用Signal处理接口执行操作);这些组件将在后面的内容描述。
1)注册事件
函数原型:
int event_add(struct event *ev, const struct timeval *tv)
参数:ev:指向要注册的事件;
tv:超时时间;
函数将ev注册到ev->ev_base上,事件类型由ev->ev_events指明,如果注册成功,ev将被插入到已注册链表中;如果tv不是NULL,则会同时注册定时事件,将ev添加到timer堆上;
如果其中有一步操作失败,那么函数保证没有事件会被注册,可以讲这相当于一个原子操作。这个函数也体现了libevent细节之处的巧妙设计,且仔细看程序代码,部分有省略,注释直接附在代码中。

view plain
  1. int event_add(struct event *ev, const struct timeval *tv)
  2. {
  3.  struct event_base *base = ev->ev_base; // 要注册到的event_base
  4.  const struct eventop *evsel = base->evsel;
  5.  void *evbase = base->evbase; // base使用的系统I/O策略
  6.  // 新的timer事件,调用timer heap接口在堆上预留一个位置
  7.  // 注:这样能保证该操作的原子性:
  8.  // 向系统I/O机制注册可能会失败,而当在堆上预留成功后,
  9.  // 定时事件的添加将肯定不会失败;
  10.  // 而预留位置的可能结果是堆扩充,但是内部元素并不会改变
  11.  if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
  12.   if (min_heap_reserve(&base->timeheap,
  13.    1 + min_heap_size(&base->timeheap)) == -1)
  14.    return (-1);  /* ENOMEM == errno */
  15.  }
  16.  // 如果事件ev不在已注册或者激活链表中,则调用evbase注册事件
  17.  if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
  18.   !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
  19.    res = evsel->add(evbase, ev);
  20.    if (res != -1) // 注册成功,插入event到已注册链表中
  21.     event_queue_insert(base, ev, EVLIST_INSERTED);
  22.  }
  23.  // 准备添加定时事件
  24.  if (res != -1 && tv != NULL) {
  25.   struct timeval now;
  26.   // EVLIST_TIMEOUT表明event已经在定时器堆中了,删除旧的
  27.   if (ev->ev_flags & EVLIST_TIMEOUT)
  28.    event_queue_remove(base, ev, EVLIST_TIMEOUT);
  29.   // 如果事件已经是就绪状态则从激活链表中删除
  30.   if ((ev->ev_flags & EVLIST_ACTIVE) &&
  31.    (ev->ev_res & EV_TIMEOUT)) {
  32.     // 将ev_callback调用次数设置为0
  33.     if (ev->ev_ncalls && ev->ev_pncalls) {
  34.      *ev->ev_pncalls = 0;
  35.     }
  36.     event_queue_remove(base, ev, EVLIST_ACTIVE);
  37.   }
  38.   // 计算时间,并插入到timer小根堆中
  39.   gettime(base, &now);
  40.   evutil_timeradd(&now, tv, &ev->ev_timeout);
  41.   event_queue_insert(base, ev, EVLIST_TIMEOUT);
  42.  }
  43.  return (res);
  44. }
  45. event_queue_insert()负责将事件插入到对应的链表中,下面是程序代码;
  46. event_queue_remove()负责将事件从对应的链表中删除,这里就不再重复贴代码了;
  47. void event_queue_insert(struct event_base *base, struct event *ev, int queue)
  48. {
  49.  // ev可能已经在激活列表中了,避免重复插入
  50.  if (ev->ev_flags & queue) {
  51.   if (queue & EVLIST_ACTIVE)
  52.    return;
  53.  }
  54.  // …
  55.  ev->ev_flags |= queue; // 记录queue标记
  56.  switch (queue) {
  57.  case EVLIST_INSERTED: // I/O或Signal事件,加入已注册事件链表
  58.   TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
  59.   break;
  60.  case EVLIST_ACTIVE: // 就绪事件,加入激活链表
  61.   base->event_count_active++;
  62.   TAILQ_INSERT_TAIL(base->activequeues[ev->ev_pri], ev, ev_active_next);
  63.   break;
  64.  case EVLIST_TIMEOUT: // 定时事件,加入堆
  65.   min_heap_push(&base->timeheap, ev);
  66.   break;
  67.  }
  68. }

 

2)删除事件:
函数原型为:int  event_del(struct event *ev);
该函数将删除事件ev,对于I/O事件,从I/O 的demultiplexer上将事件注销;对于Signal事件,将从Signal事件链表中删除;对于定时事件,将从堆上删除;
同样删除事件的操作则不一定是原子的,比如删除时间事件之后,有可能从系统I/O机制中注销会失败。

 

view plain
  1. int event_del(struct event *ev)
  2. {
  3.  struct event_base *base;
  4.  const struct eventop *evsel;
  5.  void *evbase;
  6.  // ev_base为NULL,表明ev没有被注册
  7.  if (ev->ev_base == NULL)
  8.   return (-1);
  9.  // 取得ev注册的event_base和eventop指针
  10.  base = ev->ev_base;
  11.  evsel = base->evsel;
  12.  evbase = base->evbase;
  13.  // 将ev_callback调用次数设置为
  14.  if (ev->ev_ncalls && ev->ev_pncalls) {
  15.   *ev->ev_pncalls = 0;
  16.  }
  17.  // 从对应的链表中删除
  18.  if (ev->ev_flags & EVLIST_TIMEOUT)
  19.   event_queue_remove(base, ev, EVLIST_TIMEOUT);
  20.  if (ev->ev_flags & EVLIST_ACTIVE)
  21.   event_queue_remove(base, ev, EVLIST_ACTIVE);
  22.  if (ev->ev_flags & EVLIST_INSERTED) {
  23.   event_queue_remove(base, ev, EVLIST_INSERTED);
  24.   // EVLIST_INSERTED表明是I/O或者Signal事件,
  25.   // 需要调用I/O demultiplexer注销事件
  26.   return (evsel->del(evbase, ev));
  27.  }
  28.  return (0);
  29. }

4 小节
分析了event_base这一重要结构体,初步看到了libevent对系统的I/O demultiplex机制的封装event_op结构,并结合源代码分析了事件的注册和删除处理,下面将会接着分析事件管理框架中的主事件循环部分。

现在我们已经初步了解了libevent的Reactor组件——event_base和事件管理框架,接下来就是libevent事件处理的中心部分——事件主循环,根据系统提供的事件多路分发机制执行事件循环,对已注册的就绪事件,调用注册事件的回调函数来处理事件。

1 阶段性的胜利

Libevent将I/O事件、定时器和信号事件处理很好的结合到了一起,本节也会介绍libevent是如何做到这一点的。
在看完本节的内容后,读者应该会对Libevent的基本框架:事件管理和主循环有比较清晰的认识了,并能够把libevent的事件控制流程清晰的串通起来,剩下的就是一些细节的内容了。

2 事件处理主循环

Libevent的事件主循环主要是通过event_base_loop ()函数完成的,其主要操作如下面的流程图所示,event_base_loop所作的就是持续执行下面的循环。
main loop

清楚了event_base_loop所作的主要操作,就可以对比源代码看个究竟了,代码结构还是相当清晰的。

view plain
  1. int event_base_loop(struct event_base *base, int flags)
  2. {
  3.     const struct eventop *evsel = base->evsel;
  4.     void *evbase = base->evbase;
  5.     struct timeval tv;
  6.     struct timeval *tv_p;
  7.     int res, done;
  8.     // 清空时间缓存
  9.     base->tv_cache.tv_sec = 0;
  10.     // evsignal_base是全局变量,在处理signal时,用于指名signal所属的event_base实例
  11.     if (base->sig.ev_signal_added)
  12.         evsignal_base = base;
  13.     done = 0;
  14.     while (!done) { // 事件主循环
  15.         // 查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置event_gotterm标记
  16.         // 调用event_base_loopbreak()设置event_break标记
  17.         if (base->event_gotterm) {
  18.             base->event_gotterm = 0;
  19.             break;
  20.         }
  21.         if (base->event_break) {
  22.             base->event_break = 0;
  23.             break;
  24.         }
  25.         // 校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间
  26.         // 在timeout_correct函数里,比较last wait time和当前时间,如果当前时间< last wait time
  27.         // 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时间。
  28.         timeout_correct(base, &tv);
  29.         // 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer的最大等待时间
  30.         tv_p = &tv;
  31.         if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
  32.             timeout_next(base, &tv_p);
  33.         } else {
  34.             // 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,不必等待
  35.             // 下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理
  36.             evutil_timerclear(&tv);
  37.         }
  38.         // 如果当前没有注册事件,就退出
  39.         if (!event_haveevents(base)) {
  40.             event_debug((“%s: no events registered.”, __func__));
  41.             return (1);
  42.         }
  43.         // 更新last wait time,并清空time cache
  44.         gettime(base, &base->event_tv);
  45.         base->tv_cache.tv_sec = 0;
  46.         // 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等;
  47.         // 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中
  48.         res = evsel->dispatch(base, evbase, tv_p);
  49.         if (res == -1)
  50.             return (-1);
  51.         // 将time cache赋值为当前系统时间
  52.         gettime(base, &base->tv_cache);
  53.         // 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中
  54.         timeout_process(base);
  55.         // 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理
  56.         // 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,
  57.         // 然后处理链表中的所有就绪事件;
  58.         // 因此低优先级的就绪事件可能得不到及时处理;
  59.         if (base->event_count_active) {
  60.             event_process_active(base);
  61.             if (!base->event_count_active && (flags & EVLOOP_ONCE))
  62.                 done = 1;
  63.         } else if (flags & EVLOOP_NONBLOCK)
  64.             done = 1;
  65.     }
  66.     // 循环结束,清空时间缓存
  67.     base->tv_cache.tv_sec = 0;
  68.     event_debug((“%s: asked to terminate loop.”, __func__));
  69.     return (0);
  70. }

 

3 I/O和TIMER事件的统一

Libevent将Timer和Signal事件都统一到了系统的I/O 的demultiplex机制中了,相信读者从上面的流程和代码中也能窥出一斑了,下面就再啰嗦一次了。
首先将Timer事件融合到系统I/O多路复用机制中,还是相当清晰的,因为系统的I/O机制像select()和epoll_wait()都允许程序制定一个最大等待时间(也称为最大超时时间)timeout,即使没有I/O事件发生,它们也保证能在timeout时间内返回。
那么根据所有Timer事件的最小超时时间来设置系统I/O的timeout时间;当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统的I/O机制中了。
这是在Reactor和Proactor模式(主动器模式,比如Windows上的IOCP)中处理Timer事件的经典方法了,ACE采用的也是这种方法,大家可以参考POSA vol2书中的Reactor模式一节。
堆是一种经典的数据结构,向堆中插入、删除元素时间复杂度都是O(lgN),N为堆中元素的个数,而获取最小key值(小根堆)的复杂度为O(1);因此变成了管理Timer事件的绝佳人选(当然是非唯一的),libevent就是采用的堆结构。

4 I/O和SIGNAL事件的统一

Signal是异步事件的经典事例,将Signal事件统一到系统的I/O多路复用中就不像Timer事件那么自然了,Signal事件的出现对于进程来讲是完全随机的,进程不能只是测试一个变量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”。
如果当Signal发生时,并不立即调用event的callback函数处理信号,而是设法通知系统的I/O机制,让其返回,然后再统一和I/O事件以及Timer一起处理,不就可以了嘛。是的,这也是libevent中使用的方法。
问题的核心在于,当Signal发生时,如何通知系统的I/O多路复用机制,这里先买个小关子,放到信号处理一节再详细说明,我想读者肯定也能想出通知的方法,比如使用pipe。

5 小节

介绍了libevent的事件主循环,描述了libevent是如何处理就绪的I/O事件、定时器和信号事件,以及如何将它们无缝的融合到一起。
加油!

 

现在我们已经了解了libevent的基本框架:事件管理框架和事件主循环。上节提到了libevent中I/O事件和Signal以及Timer事件的集成,这一节将分析如何将Signal集成到事件主循环的框架中。

1 集成策略——使用SOCKET PAIR

前一节已经做了足够多的介绍了,基本方法就是采用“消息机制”。在libevent中这是通过socket pair完成的,下面就来详细分析一下。
Socket pair就是一个socket对,包含两个socket,一个读socket,一个写socket。工作方式如下图所示:
socket pair

创建一个socket pair并不是复杂的操作,可以参见下面的流程图,清晰起见,其中忽略了一些错误处理和检查。

create flow

Libevent提供了辅助函数evutil_socketpair()来创建一个socket pair,可以结合上面的创建流程来分析该函数。

2 集成到事件主循环——通知EVENT_BASE

Socket pair创建好了,可是libevent的事件主循环还是不知道Signal是否发生了啊,看来我们还差了最后一步,那就是:为socket pair的读socket在libevent的event_base实例上注册一个persist的读事件。
这样当向写socket写入数据时,读socket就会得到通知,触发读事件,从而event_base就能相应的得到通知了。
前面提到过,Libevent会在事件主循环中检查标记,来确定是否有触发的signal,如果标记被设置就处理这些signal,这段代码在各个具体的I/O机制中,以Epoll为例,在epoll_dispatch()函数中,代码片段如下:

view plain
  1. res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
  2. if (res == -1) {
  3.     if (errno != EINTR) {
  4.         event_warn(“epoll_wait”);
  5.         return (-1);
  6.     }
  7.     evsignal_process(base);// 处理signal事件
  8.     return (0);
  9. } else if (base->sig.evsignal_caught) {
  10.     evsignal_process(base);// 处理signal事件
  11. }

完整的处理框架如下所示:
integrate frame
注1:libevent中,初始化阶段并不注册读socket的读事件,而是在注册信号阶段才会测试并注册;
注2:libevent中,检查I/O事件是在各系统I/O机制的dispatch()函数中完成的,该dispatch()函数在event_base_loop()函数中被调用;

 

3 EVSIGNAL_INFO结构体

Libevent中Signal事件的管理是通过结构体evsignal_info完成的,结构体位于evsignal.h文件中,定义如下:

view plain
  1. struct evsignal_info {
  2.     struct event ev_signal;
  3.     int ev_signal_pair[2];
  4.     int ev_signal_added;
  5.     volatile sig_atomic_t evsignal_caught;
  6.     struct event_list evsigevents[NSIG];
  7.     sig_atomic_t evsigcaught[NSIG];
  8. #ifdef HAVE_SIGACTION
  9.     struct sigaction **sh_old;
  10. #else
  11.     ev_sighandler_t **sh_old;
  12. #endif
  13.     int sh_old_max;
  14. };

下面详细介绍一下个字段的含义和作用:
1)ev_signal, 为socket pair的读socket向event_base注册读事件时使用的event结构体;
2)ev_signal_pair,socket pair对,作用见第一节的介绍;
3)ev_signal_added,记录ev_signal事件是否已经注册了;
4)evsignal_caught,是否有信号发生的标记;是volatile类型,因为它会在另外的线程中被修改;
5)evsigvents[NSIG],数组,evsigevents[signo]表示注册到信号signo的事件链表;
6)evsigcaught[NSIG],具体记录每个信号触发的次数,evsigcaught[signo]是记录信号signo被触发的次数;
7)sh_old记录了原来的signal处理函数指针,当信号signo注册的event被清空时,需要重新设置其处理函数;
evsignal_info的初始化包括,创建socket pair,设置ev_signal事件(但并没有注册,而是等到有信号注册时才检查并注册),并将所有标记置零,初始化信号的注册事件链表指针等。

 

4 注册、注销SIGNAL事件

注册signal事件是通过evsignal_add(struct event *ev)函数完成的,libevent对所有的信号注册同一个处理函数evsignal_handler(),该函数将在下一段介绍,注册过程如下:
1 取得ev要注册到的信号signo;
2 如果信号signo未被注册,那么就为signo注册信号处理函数evsignal_handler();
3 如果事件ev_signal还没哟注册,就注册ev_signal事件;
4 将事件ev添加到signo的event链表中;
从signo上注销一个已注册的signal事件就更简单了,直接从其已注册事件的链表中移除即可。如果事件链表已空,那么就恢复旧的处理函数;
下面的讲解都以signal()函数为例,sigaction()函数的处理和signal()相似。
处理函数evsignal_handler()函数做的事情很简单,就是记录信号的发生次数,并通知event_base有信号触发,需要处理:

view plain
  1. static void evsignal_handler(int sig)
  2. {
  3.     int save_errno = errno; // 不覆盖原来的错误代码
  4.     if (evsignal_base == NULL) {
  5.         event_warn(“%s: received signal %d, but have no base configured”, __func__, sig);
  6.         return;
  7.     }
  8.     // 记录信号sig的触发次数,并设置event触发标记
  9.     evsignal_base->sig.evsigcaught[sig]++;
  10.     evsignal_base->sig.evsignal_caught = 1;
  11. #ifndef HAVE_SIGACTION
  12.     signal(sig, evsignal_handler); // 重新注册信号
  13. #endif
  14.     // 向写socket写一个字节数据,触发event_base的I/O事件,从而通知其有信号触发,需要处理
  15.     send(evsignal_base->sig.ev_signal_pair[0], ”a”, 1, 0);
  16.     errno = save_errno; // 错误代码
  17. }

5 小节
本节介绍了libevent对signal事件的具体处理框架,包括事件注册、删除和socket pair通知机制,以及是如何将Signal事件集成到事件主循环之中的。

现在再来详细分析libevent中I/O事件和Timer事件的集成,与Signal相比,Timer事件的集成会直观和简单很多。Libevent对堆的调整操作做了一些优化,本节还会描述这些优化方法。

1 集成到事件主循环

因为系统的I/O机制像select()和epoll_wait()都允许程序制定一个最大等待时间(也称为最大超时时间)timeout,即使没有I/O事件发生,它们也保证能在timeout时间内返回。
那么根据所有Timer事件的最小超时时间来设置系统I/O的timeout时间;当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统的I/O机制中了。
具体的代码在源文件event.c的event_base_loop()中,现在就对比代码来看看这一处理方法:

view plain
  1.       if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
  2.           // 根据Timer事件计算evsel->dispatch的最大等待时间
  3.     timeout_next(base, &tv_p);
  4. } else {
  5.           // 如果还有活动事件,就不要等待,让evsel->dispatch立即返回
  6.     evutil_timerclear(&tv);
  7. }
  8. // …
  9.       // 调用select() or epoll_wait() 等待就绪I/O事件
  10. res = evsel->dispatch(base, evbase, tv_p);
  11.       // …
  12.       // 处理超时事件,将超时事件插入到激活链表中
  13.       timeout_process(base);

timeout_next()函数根据堆中具有最小超时值的事件和当前时间来计算等待时间,下面看看代码:

view plain
  1. static int timeout_next(struct event_base *base, struct timeval **tv_p)
  2. {
  3.     struct timeval now;
  4.     struct event *ev;
  5.     struct timeval *tv = *tv_p;
  6.     // 堆的首元素具有最小的超时值
  7.     if ((ev = min_heap_top(&base->timeheap)) == NULL) {
  8.         // 如果没有定时事件,将等待时间设置为NULL,表示一直阻塞直到有I/O事件发生
  9.         *tv_p = NULL;
  10.         return (0);
  11.     }
  12.     // 取得当前时间
  13.     gettime(base, &now);
  14.     // 如果超时时间<=当前值,不能等待,需要立即返回
  15.     if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
  16.         evutil_timerclear(tv);
  17.         return (0);
  18.     }
  19.     // 计算等待的时间=当前时间-最小的超时时间
  20.     evutil_timersub(&ev->ev_timeout, &now, tv);
  21.     return (0);
  22. }

 

2 TIMER小根堆

Libevent使用堆来管理Timer事件,其key值就是事件的超时时间,源代码位于文件min_heap.h中。
所有的数据结构书中都有关于堆的详细介绍,向堆中插入、删除元素时间复杂度都是O(lgN),N为堆中元素的个数,而获取最小key值(小根堆)的复杂度为O(1)。堆是一个完全二叉树,基本存储方式是一个数组。
Libevent实现的堆还是比较轻巧的,虽然我不喜欢这种编码方式(搞一些复杂的表达式)。轻巧到什么地方呢,就以插入元素为例,来对比说明,下面伪代码中的size表示当前堆的元素个数:
典型的代码逻辑如下:

view plain
  1. Heap[size++] = new; // 先放到数组末尾,元素个数+1
  2. // 下面就是shift_up()的代码逻辑,不断的将new向上调整
  3. _child = size;
  4. while(_child>0) // 循环
  5. {
  6.    _parent = (_child-1)/2; // 计算parent
  7.    if(Heap[_parent].key < Heap[_child].key)
  8.       break; // 调整结束,跳出循环
  9.    swap(_parent, _child); // 交换parent和child
  10. }

而libevent的heap代码对这一过程做了优化,在插入新元素时,只是为新元素预留了一个位置hole(初始时hole位于数组尾部),但并不立刻将新元素插入到hole上,而是不断向上调整hole的值,将父节点向下调整,最后确认hole就是新元素的所在位置时,才会真正的将新元素插入到hole上,因此在调整过程中就比上面的代码少了一次赋值的操作,代码逻辑是:
下面就是shift_up()的代码逻辑,不断的将new的“预留位置”向上调整

view plain
  1. // 下面就是shift_up()的代码逻辑,不断的将new的“预留位置”向上调整
  2. _hole = size; // _hole就是为new预留的位置,但并不立刻将new放上
  3. while(_hole>0) // 循环
  4. {
  5.     _parent = (_hole-1)/2; // 计算parent
  6.     if(Heap[_parent].key < new.key)
  7.         break; // 调整结束,跳出循环
  8.     Heap[_hole] = Heap[_parent]; // 将parent向下调整
  9.     _hole = _parent; // 将_hole调整到_parent
  10. }
  11. Heap[_hole] = new; // 调整结束,将new插入到_hole指示的位置
  12. size++; // 元素个数+1

由于每次调整都少做一次赋值操作,在调整路径比较长时,调整效率会比第一种有所提高。libevent中的min_heap_shift_up_()函数就是上面逻辑的具体实现,对应的向下调整函数是min_heap_shift_down_()。
举个例子,向一个小根堆3, 5, 8, 7, 12中插入新元素2,使用第一中典型的代码逻辑,其调整过程如下图所示:
heap insert

使用libevent中的堆调整逻辑,调整过程如下图所示:
heap insert

对于删除和元素修改操作,也遵从相同的逻辑,就不再罗嗦了。

 

3 小节

通过设置系统I/O机制的wait时间,从而简捷的集成Timer事件;主要分析了libevent对堆调整操作的优化