首页 > 代码库 > 记一个多线程使用libevent的问题
记一个多线程使用libevent的问题
前段时间使用libevent网络库实现了一个游戏服务器引擎,在此记录下其中遇到的一个问题。我在设计服务器上选择把逻辑和网络分线程,线程之间通信使用队列。但是这样做会有个问题,当逻辑线程想要主动的发一个数据包的时候,需要一种唤醒网络线程的机制。由于对libevent的api不熟悉,起初我是自己实现这个功能的。实现确实是不复杂,但是缺违背了我的初心:只写简单必要的代码,保证尽可能少的bug。直到后来和同事探讨了一番,才发现原来libevent是有对此做支持的,但是具体怎么做,文档里面没有详细的说,因此同事也说不出个所以然。鉴于此情况,我决定,把libevent中与此相关的源码粗略的过一遍,以求能弄明白以下两件事:
(1)与跨线程唤醒事件等待相关的api有哪些,以及如何使用?
(2)这些api背后到底做了哪些工作?
相关API,及用法
和我起初想得不一样,libevent相关的api很简单并且只有一个:
/*call event_use_pthreads() if use posix threads.*/ evthread_use_windows_threads(); struct event_base* ev_base = event_base_new();
需要注意的是函数evthread_use_windows_threads的调用必须在初始化event_base之前。在此之后,逻辑线程执行bufferevent_write的时候,libevent会自动唤醒网络线程的事件循环,并执行发送数据。
隐藏在API背后的逻辑
先看看evthread_use_windows_threads函数做了什么?
int evthread_use_windows_threads(void) { ... evthread_set_lock_callbacks(&cbs); evthread_set_id_callback(evthread_win32_get_id); evthread_set_condition_callbacks(&cond_cbs); return 0; }
通过调用evthread_use_windows_threads,我们设置了一些回调函数,包括设置了libevent获取线程id的回调函数evthread_id_fn_。
看看初始化事件循环的函数event_base_new做了什么:
// event.c struct event_base * event_base_new(void) { ... base = event_base_new_with_config(cfg); } struct event_base * event_base_new_with_config(const struct event_config *cfg) { ... #ifndef EVENT__DISABLE_THREAD_SUPPORT if (EVTHREAD_LOCKING_ENABLED() && (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) { int r; EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0); EVTHREAD_ALLOC_COND(base->current_event_cond); r = evthread_make_base_notifiable(base); if (r<0) { event_warnx("%s: Unable to make base notifiable.", __func__); event_base_free(base); return NULL; } } #endif ... } int evthread_make_base_notifiable(struct event_base *base) { int r; if (!base) return -1; EVBASE_ACQUIRE_LOCK(base, th_base_lock); r = evthread_make_base_notifiable_nolock_(base); EVBASE_RELEASE_LOCK(base, th_base_lock); return r; } static int evthread_make_base_notifiable_nolock_(struct event_base *base) { ... if (evutil_make_internal_pipe_(base->th_notify_fd) == 0) { notify = evthread_notify_base_default; cb = evthread_notify_drain_default; } else { return -1; } base->th_notify_fn = notify; }
/* Internal function: Set fd[0] and fd[1] to a pair of fds such that writes on * fd[1] get read from fd[0]. Make both fds nonblocking and close-on-exec. * Return 0 on success, -1 on failure. */ int evutil_make_internal_pipe_(evutil_socket_t fd[2]) { }
它通过如下调用:
event_base_new-->event_base_new_with_config-->evthread_make_base_notifiable-->evthread_make_base_notifiable_nolock_-->evutil_make_internal_pipe_
最后通过evutil_socketpair构造了两个本地的互相连接的socket(windows环境下,用此来模拟pipe)。
唤醒相关的核心逻辑在evthread_notify_base_default函数中:
static int evthread_notify_base_default(struct event_base *base) { char buf[1]; int r; buf[0] = (char) 0; #ifdef _WIN32 r = send(base->th_notify_fd[1], buf, 1, 0); #else r = write(base->th_notify_fd[1], buf, 1); #endif return (r < 0 && ! EVUTIL_ERR_IS_EAGAIN(errno)) ? -1 : 0; }
可以看出,在windows下libevent的唤醒机制实际也是self pipe trick,只不过它通过构造一对socket来模拟pipe,当需要唤醒的时候,它就往其中一个socket写入1个字节。
再去看看bufferevent_write:
// bufferevent.c int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) { if (evbuffer_add(bufev->output, data, size) == -1) return (-1); return 0; } // buffer.c int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) { ... evbuffer_invoke_callbacks_(buf); }
它会触发一系列列回调函数,而这些回调函数在创建bufferevent的时候被指定:
//bufferevent_sock.c struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) { ... evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); } static void bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) { ... if (cbinfo->n_added && (bufev->enabled & EV_WRITE) && !event_pending(&bufev->ev_write, EV_WRITE, NULL) && !bufev_p->write_suspended) { /* Somebody added data to the buffer, and we would like to * write, and we were not writing. So, start writing. */ if (bufferevent_add_event_(&bufev->ev_write, &bufev->timeout_write) == -1) { /* Should we log this? */ } } } //bufferevent.c int bufferevent_add_event_(struct event *ev, const struct timeval *tv) { if (!evutil_timerisset(tv)) return event_add(ev, NULL); else return event_add(ev, tv); } //event.c int event_add(struct event *ev, const struct timeval *tv) { EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); res = event_add_nolock_(ev, tv, 0); EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); } int event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) { ... /* if we are not in the right thread, we need to wake up the loop */ //如果在构造event_base之前调用了evthread_use_windows_threads,EVBASE_NEED_NOTIFY此时会返回true,否则为false。 if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); }
由代码可知,在往bufferevent写数据后执行的回调函数中,就有唤醒网络线程逻辑(evthread_notify_base)。那为什么还需要手动调用evthread_use_windows_threads函数呢?
这里再说一下:
#define EVBASE_NEED_NOTIFY(base) ((base)->running_loop && ((base)->th_owner_id != evthreadimpl_get_id_())) unsigned long evthreadimpl_get_id_() { return evthread_id_fn_ ? evthread_id_fn_() : 1; }
之前说过,当调用evthread_use_windows_threads,设置了libevent获取线程id的回调函数evthread_id_fn_。也正因为此,才会去跑下去执行evthread_notify_base函数:
static int evthread_notify_base(struct event_base *base) { EVENT_BASE_ASSERT_LOCKED(base); if (!base->th_notify_fn) return -1; if (base->is_notify_pending) return 0; base->is_notify_pending = 1; return base->th_notify_fn(base); }
所以,当我们在逻辑线程调用bufferevent_write尝试发送一段数据的时候,它会依据以下的调用,通知网络线程:
bufferevent_write-->evbuffer_add-->evbuffer_invoke_callbacks_-->bufferevent_socket_evtbuf_cb_-->bufferevent_add_event_-->event_add-->event_add_nolock_-->evthread_notify_base
以上便是libevent跨线程唤醒的逻辑。
记一个多线程使用libevent的问题