首页 > 代码库 > 记一个多线程使用libevent的问题

记一个多线程使用libevent的问题

前段时间使用libevent网络库实现了一个游戏服务器引擎,在此记录下其中遇到的一个问题。我在设计服务器上选择把逻辑和网络分线程,线程之间通信使用队列。但是这样做会有个问题,当逻辑线程想要主动的发一个数据包的时候,需要一种唤醒网络线程的机制。由于对libevent的api不熟悉,起初我是自己实现这个功能的。实现确实是不复杂,但是缺违背了我的初心:只写简单必要的代码,保证尽可能少的bug。直到后来和同事探讨了一番,才发现原来libevent是有对此做支持的,但是具体怎么做,文档里面没有详细的说,因此同事也说不出个所以然。鉴于此情况,我决定,把libevent中与此相关的源码粗略的过一遍,以求能弄明白以下两件事:

(1)与跨线程唤醒事件等待相关的api有哪些,以及如何使用?

(2)这些api背后到底做了哪些工作?

 

相关API,及用法

和我起初想得不一样,libevent相关的api很简单并且只有一个:

/*call event_use_pthreads() if use posix threads.*/ 
evthread_use_windows_threads();
struct event_base* ev_base = event_base_new();

  

需要注意的是函数evthread_use_windows_threads的调用必须在初始化event_base之前。在此之后,逻辑线程执行bufferevent_write的时候,libevent会自动唤醒网络线程的事件循环,并执行发送数据。

 

隐藏在API背后的逻辑

先看看evthread_use_windows_threads函数做了什么?

int evthread_use_windows_threads(void) {
  ...  
  evthread_set_lock_callbacks(&cbs);
  evthread_set_id_callback(evthread_win32_get_id);
  evthread_set_condition_callbacks(&cond_cbs);
  return 0;            
}

通过调用evthread_use_windows_threads,我们设置了一些回调函数,包括设置了libevent获取线程id的回调函数evthread_id_fn_

 

看看初始化事件循环的函数event_base_new做了什么:

// event.c
struct event_base * 
event_base_new(void) {
    ...
    base = event_base_new_with_config(cfg);
}

struct event_base * 
event_base_new_with_config(const struct event_config *cfg) {
    ...
#ifndef EVENT__DISABLE_THREAD_SUPPORT
	if (EVTHREAD_LOCKING_ENABLED() &&
	    (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
		int r;
		EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0);
		EVTHREAD_ALLOC_COND(base->current_event_cond);
		r = evthread_make_base_notifiable(base);
		if (r<0) {
			event_warnx("%s: Unable to make base notifiable.", __func__);
			event_base_free(base);
			return NULL;
		}
	}
#endif
    ...
}

int
evthread_make_base_notifiable(struct event_base *base) {
	int r;
	if (!base)
		return -1;

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	r = evthread_make_base_notifiable_nolock_(base);
	EVBASE_RELEASE_LOCK(base, th_base_lock);
	return r;
}

static int
evthread_make_base_notifiable_nolock_(struct event_base *base) {
    ...
    if (evutil_make_internal_pipe_(base->th_notify_fd) == 0) {
	notify = evthread_notify_base_default;
	cb = evthread_notify_drain_default;
    } else {
	return -1;
    }

    base->th_notify_fn = notify;
}
/* Internal function: Set fd[0] and fd[1] to a pair of fds such that writes on
 * fd[1] get read from fd[0].  Make both fds nonblocking and close-on-exec.
 * Return 0 on success, -1 on failure.
 */
int
evutil_make_internal_pipe_(evutil_socket_t fd[2]) {

}

  

它通过如下调用:

event_base_new-->event_base_new_with_config-->evthread_make_base_notifiable-->evthread_make_base_notifiable_nolock_-->evutil_make_internal_pipe_

最后通过evutil_socketpair构造了两个本地的互相连接的socket(windows环境下,用此来模拟pipe)。

 

唤醒相关的核心逻辑在evthread_notify_base_default函数中:

static int evthread_notify_base_default(struct event_base *base) { 
  char buf[1]; 
  int r; 
  buf[0] = (char) 0; 
#ifdef _WIN32 
  r = send(base->th_notify_fd[1], buf, 1, 0); 
#else 
  r = write(base->th_notify_fd[1], buf, 1); 
#endif 
  return (r < 0 && ! EVUTIL_ERR_IS_EAGAIN(errno)) ? -1 : 0; 
}

可以看出,在windows下libevent的唤醒机制实际也是self pipe trick,只不过它通过构造一对socket来模拟pipe,当需要唤醒的时候,它就往其中一个socket写入1个字节。

 

再去看看bufferevent_write:

// bufferevent.c
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) {
	if (evbuffer_add(bufev->output, data, size) == -1)
		return (-1);

	return 0;
}

// buffer.c
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) {
    ...
    evbuffer_invoke_callbacks_(buf);
}

它会触发一系列列回调函数,而这些回调函数在创建bufferevent的时候被指定:

//bufferevent_sock.c
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) {
    ...
    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
}

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) {
    ...
    if (cbinfo->n_added &&
	    (bufev->enabled & EV_WRITE) &&
	    !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
	    !bufev_p->write_suspended) {
		/* Somebody added data to the buffer, and we would like to
		 * write, and we were not writing.  So, start writing. */
		if (bufferevent_add_event_(&bufev->ev_write, &bufev->timeout_write) == -1) {
		    /* Should we log this? */
		}
	}
}

//bufferevent.c
int
bufferevent_add_event_(struct event *ev, const struct timeval *tv) {
	if (!evutil_timerisset(tv))
		return event_add(ev, NULL);
	else
		return event_add(ev, tv);
}

//event.c
int
event_add(struct event *ev, const struct timeval *tv) {
	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
	res = event_add_nolock_(ev, tv, 0);
	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}

int
event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) {
    ...
    /* if we are not in the right thread, we need to wake up the loop */
    //如果在构造event_base之前调用了evthread_use_windows_threads,EVBASE_NEED_NOTIFY此时会返回true,否则为false。
    if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
        evthread_notify_base(base);
}

由代码可知,在往bufferevent写数据后执行的回调函数中,就有唤醒网络线程逻辑(evthread_notify_base)。那为什么还需要手动调用evthread_use_windows_threads函数呢?

这里再说一下:

#define EVBASE_NEED_NOTIFY(base)			 	((base)->running_loop &&			 	    ((base)->th_owner_id != evthreadimpl_get_id_()))

unsigned long
evthreadimpl_get_id_() {
	return evthread_id_fn_ ? evthread_id_fn_() : 1;
}

之前说过,当调用evthread_use_windows_threads,设置了libevent获取线程id的回调函数evthread_id_fn_。也正因为此,才会去跑下去执行evthread_notify_base函数:

static int
evthread_notify_base(struct event_base *base) {
	EVENT_BASE_ASSERT_LOCKED(base);
	if (!base->th_notify_fn)
		return -1;
	if (base->is_notify_pending)
		return 0;
	base->is_notify_pending = 1;
	return base->th_notify_fn(base);
}

所以,当我们在逻辑线程调用bufferevent_write尝试发送一段数据的时候,它会依据以下的调用,通知网络线程:

bufferevent_write-->evbuffer_add-->evbuffer_invoke_callbacks_-->bufferevent_socket_evtbuf_cb_-->bufferevent_add_event_-->event_add-->event_add_nolock_-->evthread_notify_base

以上便是libevent跨线程唤醒的逻辑。

记一个多线程使用libevent的问题