首页 > 代码库 > libevent学习七
libevent学习七
Bufferevents:概念和基础
很多时候,一个程序需要处理一些数据的缓存,不止应用在答复event上。例如:当我们需要去写出数据,通常会这样做:
1. 发现有数据需要写出到一条连接上;把这些数据放到buffer里。
2. 等连接变成可写的状态。
3. 尽可能的写入数据。
4. 记住我们写了多少数据,然后如果数据没有全部写完,就等连接再次变为可写的状态。
这种IO缓冲模式已经足够Libevent的日常使用。一个“bufferevent”是由一个底层传输渠道(如socket),一个读buffer,一个写buffer。对于常规的event
,当传输渠道准备好读或写时,它会直接调用它的回调。而bufferevent则是当底层有足够的数据可读或可写时才去调用回调。
这里有多种bufferevent类型,它们共享同一套接口。它们包括:
1. socket-based bufferevents:从底层流socket发送和接收数据,使用event_*接口。
2. asynchronout-IO bufferevents:使用Windows IOCP接口去发送和接收数据(只有Windows支持)。
3. filtering bufferevents:在数据被传给传输层前,先处理一下数据-例如:压缩和解压数据。
4. paired bufferevents:两个bufferevent之间进行交互数据。
注:bufferevent目前只支持像TCP一样的流协议。在将来可能会支持数据报文协议,像UDP。
Bufferevent和evbuffers
每个bufferevent都有一个input buffer和一个output buffer。buffer的类型是“struct evbuffer”。当你有数据需要写出到bufferevent,你要把这些数据添加到output buffer;当bufferevent有数据给你去读时,你实际上要从input buffer中读取它们。
回调和水标位
每个bufferevent有2个数据相关的回调:读回调和写回调。默认情况下,只要有数据从传输层被读出来,都会调用读回调;只要传输层有足够的空间,写回调就会被调用。你可以通过调整读写“水标位”重写这些默认的行为。
每个bufferevent有4个水标位:
1. 低读水标位:只要bufferevent的input buffer达到或超过了这个标准,读回调就会被调用。默认是0,所以每个读都会触发读回调。
2. 高读水标位:如果bufferevent的input buffer达到了这个标准,bufferevent将会停止读,直到有数据被从input buffer中被取走,使它低于这个标准。默认是没限制的,所以实际上我们从不会停止读。
3. 低写水标位:只要达到了这个标准,就会触发写回调。默认是0, 所以只有在output buffer是空的情况下,写回调才不会被触发。
4. 高写水标位:并不会被bufferevent直接使用,当bufferevent被用于与另一个bufferevent做交互时,这个水标位会被赋予特殊的意义。
一个bufferevent也有一个“错误”或“事件”回调,它是用来告知程序一些非数据的事件,例如有一条连接被关闭或发生了错误。下面这些event flag被定义:
1. BEV_EVENT_READING:在读操作期间发生的事件。
2. BEV_EVENT_WRITING:在写操作期间发生的事件。
3. BEV_EVENT_ERROR:在bufferevent操作期间发生的错误。可以通过EVUTIL_SOCKET_ERROR()获取更具体的错误信息。
4. BEV_EVENT_TIMEOUT:bufferevent上发生的超时。
5. BEV_EVENT_EOF:到达了文件尾。
6. BEV_EVENT_CONNECTED:连接建立。
递延回调
默认情况下,当相应的条件达成时,bufferevent会直接执行回调。当依赖关系变的复杂时,这种直接调用会产生一些问题。例如:假设有一个回调是当evbuffer A变空时把数据移进去,另一个回调是当evbuffer A变满时把数据移出来。一旦这些调用同时出现在栈上,你可能面临着栈溢出的风险。
为了解决这个问题,bufferevent(或evbuffer)的回调应该被递延。当一个递延回调的条件达成时,不会直接调用回调,而是把它扔到event_loop()的列表中,在常规的event回调后再被调用。
bufferevent的操作标识
你可以使用下面这些标识去控制bufferevent的一些行为,当它被创建时:
BEV_OPT_CLOSE_ON_FREE:当bufferevent被释放时,关闭底层传输渠道。
BEV_OPT_THREADSAFE:自动为bufferevent分配锁,所以应用在多线程中它是安全的。
BEV_OPT_DEFER_CALLBACKS:当这个标识被设置时,bufferevent递延所有的回调。
BEV_OPT_UNLOCK_CALLBACKS:默认情况下,当bufferevent被设置为线程安全的,bufferevent的锁在回调被调用时就会被持有。这个标识使Libevent释放bufferevent的锁,当调用你的回调时。
socket-based bufferevents
socket-based类型是使用起来最简单的bufferevent。它是通过使用Libevent的底层事件触发机制来检测socket是否准备好读或写操作,通过系统调用(如readv, writev, WSASend, WSARecv)去传输和接收数据。
socket-based bufferevent的创建
你可以通过bufferevent_socket_new()去创建一个socket-based bufferevent:
接口
struct bufferevent *bufferevent_socket_new( struct event_base *base, evutil_socket_t fd, enum bufferevent_options options);base是event_base,options是bufferevent的操作标识的按位与(BEV_OPT_CLOSE_ON_FREE,等)。fd是可选的,它表示的是socket的文件描述符。你可以先把它设置成-1,然后在后边需要的时候再设置它。
提示:确保你提供的socket是非阻塞模式的。可以通过evutil_make_socket_nonblocking方法去设置。
此方法成功返回一个指向bufferevent的指针,失败返回NULL。
发起连接
如果bufferevent的socket还没连接,你可以发起一个新的连接。
接口
int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *address, int addrlen);address和addrlen参数是用于标准系统调用connect()的。如果bufferevent还没有socket,通过这个方法可以分配一个新的流模式的socket,并且把它设置为非阻塞。如果bufferevent还没有socket,bufferevent_socket_connect()会告诉Libevent socket还没有连接,直到连接被成功建立之前,不应该有读或写发生在socket上。
在连接建立之前,往output buffer中添加数据时没问题的。
此方法连接成功时返回0, 有错误发生返回-1。
例:
#include <event2/event.h> #include <event2/bufferevent.h> #include <sys/socket.h> #include <string.h> void eventcb(struct bufferevent *bev, short events, void *ptr) { if (events & BEV_EVENT_CONNECTED) { /* We're connected to 127.0.0.1:8080. Ordinarily we'd do something here, like start reading or writing. */ } else if (events & BEV_EVENT_ERROR) { /* An error occured while connecting. */ } } int main_loop(void) { struct event_base *base; struct bufferevent *bev; struct sockaddr_in sin; base = event_base_new(); memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ sin.sin_port = htons(8080); /* Port 8080 */ bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, NULL, NULL, eventcb, NULL); if (bufferevent_socket_connect(bev, (struct sockaddr *)&sin, sizeof(sin)) < 0) { /* Error starting connection */ bufferevent_free(bev); return -1; } event_base_dispatch(base); return 0; }注:如果你使用bufferevent_socket_connect()发起一个connect,你会得到一个BEV_EVENT_CONNECTED事件。如果你自己发起connect,连接成功会触发写事件(失败会可读又可写哦,一般情况下我会线程去实现。详见)。
如果你想调用自己的connect,但当连接成功时仍想接收到一个BEV_EVENT_CONNECTED事件,你可以在connect()返回-1(错误码是EAGAIN或EINPROGRESS)后调用bufferevent_socket_connect(bev, NULL,0)。
通过域名发起连接
很多时候,你想把域名解析和创建连接放到一个操作中。这里有个接口可以实现:
接口
int bufferevent_socket_connect_hostname(struct bufferevent *bev, struct evdns_base *dns_base, int family, const char *hostname, int port); int bufferevent_socket_get_dns_error(struct bufferevent *bev);这个方法解析DNS名字hostname,查找它的family格式的地址(可用的类型有AF_INET,AF_INET6和AF_UNSPEC.)。如果名字解析失败,它会调用event回调,并传入一个错误事件。如果成功,它接下来尝试去通过bufferevent_connect发起一个连接。
dns_base参数是可选的。如果它为NULL,那么Libevent会阻塞等待名字解析结束,通常这不是你想要的结果。如果它不为NULL,Libevent使用它异步查找域名。
跟bufferevent_socket_connect()一样,如果连接未成功建立,不应该有读或写时间发生在这条socket上,直到解析结束并且连接成功建立。
如果有错误发生,它可能是DNS域名解析的错误。你可以通过bufferevent_socket_get_dns_error()找出大多数的错误。如果返回的错误码是0,说明没有DNS错误发生。
例:简单的HTTP v0客户端
/* Don't actually copy this code: it is a poor way to implement an HTTP client. Have a look at evhttp instead. */ #include <event2/dns.h> #include <event2/bufferevent.h> #include <event2/buffer.h> #include <event2/util.h> #include <event2/event.h> #include <stdio.h> void readcb(struct bufferevent *bev, void *ptr) { char buf[1024]; int n; struct evbuffer *input = bufferevent_get_input(bev); while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) { fwrite(buf, 1, n, stdout); } } void eventcb(struct bufferevent *bev, short events, void *ptr) { if (events & BEV_EVENT_CONNECTED) { printf("Connect okay.\n"); } else if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) { struct event_base *base = ptr; if (events & BEV_EVENT_ERROR) { int err = bufferevent_socket_get_dns_error(bev); if (err) printf("DNS error: %s\n", evutil_gai_strerror(err)); } printf("Closing\n"); bufferevent_free(bev); event_base_loopexit(base, NULL); } } int main(int argc, char **argv) { struct event_base *base; struct evdns_base *dns_base; struct bufferevent *bev; if (argc != 3) { printf("Trivial HTTP 0.x client\n" "Syntax: %s [hostname] [resource]\n" "Example: %s www.google.com /\n",argv[0],argv[0]); return 1; } base = event_base_new(); dns_base = evdns_base_new(base, 1); bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, readcb, NULL, eventcb, base); bufferevent_enable(bev, EV_READ|EV_WRITE); evbuffer_add_printf(bufferevent_get_output(bev), "GET %s\r\n", argv[2]); bufferevent_socket_connect_hostname( bev, dns_base, AF_UNSPEC, argv[1], 80); event_base_dispatch(base); return 0; }
通用的bufferevent操作
释放bufferevent
接口
void bufferevent_free(struct bufferevent *bev);这个方法用来释放bufferevent。bufferevent是有内部引用计数的,所以如果在你释放它的时候,它还有一个延迟回调,它不会被直接删除,而是会等待回调结束才进行。
bufferevent_free()方法会尽可能的去释放bufferevent。如果当前有数据正准备写,这部分数据可能会被丢弃。
如果BEV_OPT_CLOSE_ON_FREE标识被设置,当bufferevent被释放时,bufferevent所关联的socket或其他传输渠道会被自动关闭。
回调,水标位和开启选项
接口
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx); typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx); void bufferevent_setcb(struct bufferevent *bufev, bufferevent_data_cb readcb, bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg); void bufferevent_getcb(struct bufferevent *bufev, bufferevent_data_cb *readcb_ptr, bufferevent_data_cb *writecb_ptr, bufferevent_event_cb *eventcb_ptr, void **cbarg_ptr);bufferevent_setcb()方法设置bufferevent的回调。readcb,writecb和eventcb方法会在有读,写,错误发生时被调用。它们的第一个参数是发生事件的bufferevent。最后一个参数是用户在bufferevent_setcb中提供的cbarg参数。bufferevent_event_cb的event参数是event标识的按位与:看上边的回调和水标位章节。
你可以通过传入NULL来禁止某个回调。注意:所有的回调共享同一个cbarg参数值,所以改变它可能会影响所有的回调。
你可以通过bufferevent_getcb()获取bufferevent的当前的回调函数。
接口
void bufferevent_enable(struct bufferevent *bufev, short events); void bufferevent_disable(struct bufferevent *bufev, short events); short bufferevent_get_enabled(struct bufferevent *bufev);你可以开启或关闭bufferevent上的EV_READ,EV_WRITE,或EV_READ|EV_WRITE。当读或写被禁止时,bufferevent不会去读或写数据。
当output buffer为空时,不需要关闭写:bufferevent会自动停止写,当有数据可以写时,它会自动启用写功能。
相应的,当input buffer达到高水标位时,不需要关闭读:bufferevent会自动停止读,当有空间可以继续读的时候,它会自动启用读功能。
默认情况下,新创建的bufferevent有读功能,没有写功能(可能连接还没建立)。
你可以通过bufferevent_get_enabled()去看哪个events是开启状态的。
接口
void bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark);此方法用来调整读写水标位(如果EV_READ被设置,则是调整读水标位。如果EV_WRITE被设置,则是调整写水标位)。
高水标位如果是0,则表示“无限制”。
例:
#include <event2/event.h> #include <event2/bufferevent.h> #include <event2/buffer.h> #include <event2/util.h> #include <stdlib.h> #include <errno.h> #include <string.h> struct info { const char *name; size_t total_drained; }; void read_callback(struct bufferevent *bev, void *ctx) { struct info *inf = ctx; struct evbuffer *input = bufferevent_get_input(bev); size_t len = evbuffer_get_length(input); if (len) { inf->total_drained += len; evbuffer_drain(input, len); printf("Drained %lu bytes from %s\n", (unsigned long) len, inf->name); } } void event_callback(struct bufferevent *bev, short events, void *ctx) { struct info *inf = ctx; struct evbuffer *input = bufferevent_get_input(bev); int finished = 0; if (events & BEV_EVENT_EOF) { size_t len = evbuffer_get_length(input); printf("Got a close from %s. We drained %lu bytes from it, " "and have %lu left.\n", inf->name, (unsigned long)inf->total_drained, (unsigned long)len); finished = 1; } if (events & BEV_EVENT_ERROR) { printf("Got an error from %s: %s\n", inf->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); finished = 1; } if (finished) { free(ctx); bufferevent_free(bev); } } struct bufferevent *setup_bufferevent(void) { struct bufferevent *b1 = NULL; struct info *info1; info1 = malloc(sizeof(struct info)); info1->name = "buffer 1"; info1->total_drained = 0; /* ... Here we should set up the bufferevent and make sure it gets connected... */ /* Trigger the read callback only whenever there is at least 128 bytes of data in the buffer. */ bufferevent_setwatermark(b1, EV_READ, 128, 0); bufferevent_setcb(b1, read_callback, NULL, event_callback, info1); bufferevent_enable(b1, EV_READ); /* Start reading. */ return b1; }操作bufferevent的数据
bufferevent提供给你一些方法,可以操作读和写的数据。
接口
struct evbuffer *bufferevent_get_input(struct bufferevent *bufev); struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);这两个方法非常强大的根本是因为:它们分别返回input和output buffer。
注:你只能从input buffer中取数据,只能向output buffer中添加数据。
接口
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size); int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);这两个方法是用来向output buffer添加数据。它们成功返回0,失败返回-1。
接口
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size); int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);这两个方法是用来从input buffer中取数据。它们成功返回0,失败返回-1。
注:bufferevent_read()方法中,data所指向的内存块,必须有足够的空间去装size这么多的数据。
例:
#include <event2/bufferevent.h> #include <event2/buffer.h> #include <ctype.h> void read_callback_uppercase(struct bufferevent *bev, void *ctx) { /* This callback removes the data from bev's input buffer 128 bytes at a time, uppercases it, and starts sending it back. (Watch out! In practice, you shouldn't use toupper to implement a network protocol, unless you know for a fact that the current locale is the one you want to be using.) */ char tmp[128]; size_t n; int i; while (1) { n = bufferevent_read(bev, tmp, sizeof(tmp)); if (n <= 0) break; /* No more data. */ for (i=0; i<n; ++i) tmp[i] = toupper(tmp[i]); bufferevent_write(bev, tmp, n); } } struct proxy_info { struct bufferevent *other_bev; }; void read_callback_proxy(struct bufferevent *bev, void *ctx) { /* You might use a function like this if you're implementing a simple proxy: it will take data from one connection (on bev), and write it to another, copying as little as possible. */ struct proxy_info *inf = ctx; bufferevent_read_buffer(bev, bufferevent_get_output(inf->other_bev)); } struct count { unsigned long last_fib[2]; }; void write_callback_fibonacci(struct bufferevent *bev, void *ctx) { /* Here's a callback that adds some Fibonacci numbers to the output buffer of bev. It stops once we have added 1k of data; once this data is drained, we'll add more. */ struct count *c = ctx; struct evbuffer *tmp = evbuffer_new(); while (evbuffer_get_length(tmp) < 1024) { unsigned long next = c->last_fib[0] + c->last_fib[1]; c->last_fib[0] = c->last_fib[1]; c->last_fib[1] = next; evbuffer_add_printf(tmp, "%lu", next); } /* Now we add the whole contents of tmp to bev. */ bufferevent_write_buffer(bev, tmp); /* We don't need tmp any longer. */ evbuffer_free(tmp); }读和写的超时
与其他时间类似,当规定时间已到,但是仍没有数据被写出或没有数据被读取到,此时超时事件会发生。
接口
void bufferevent_set_timeouts(struct bufferevent *bufev, const struct timeval *timeout_read, const struct timeval *timeout_write);如果设置timeout为NULL,表示移除它。
如果bufferevent在读上等待的时间超过timeout_read秒,那么读超时事件将会发生。如果bufferevent在写上等待的时间超过timeout_write秒,那么写事件将会发生。
注:超时的计时是在bufferevent启用了读或写功能时才开始。换句话说,如果读操作被禁止或input buffer达到了高水标位,读超时也是被禁止的。写超时同理。
当读或写超时发生时,相应的读或写操作将会被禁止。事件回调的event参数会为BEV_EVENT_TIMEOUT|BEV_EVENT_READING或BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING。
刷新数据
接口
int bufferevent_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode state);刷新bufferevent是告诉bufferevent尽可能的从socket上读或写更多数据。无视其他的限制。具体的功能依赖bufferevent的类型。
iotype参数应该为EV_READ, EV_WRITE, 或EV_READ|EV_WRITE。state参数可能是BEV_NORMAL,BEV_FLUSH,或BEV_FINISHED。BEV_FINISHED表示告知另一端不会再有数据会发送;BEV_NORMAL和BEV_FLUSH的区别要看bufferevent是什么类型。
bufferevent_flush()方法失败返回-1,如果没数据被刷新返回0,如果有数据被刷新返回1。
特定类型的bufferevent方法
这些方法只适应部分bufferevent类型
接口
int bufferevent_priority_set(struct bufferevent *bufev, int pri); int bufferevent_get_priority(struct bufferevent *bufev);这个方法通过bufev和pri去调整event的优先级。它成功返回0,失败返回-1。只对socket-based bufferevent起作用。
接口
int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd); evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);这两个方法设置或返回文件描述符。只对socket-based bufferevent起作用。
接口
struct event_base *bufferevent_get_base(struct bufferevent *bev);此方法返回bufferevent的event_base。
接口
struct bufferevent *bufferevent_get_underlying(struct bufferevent *bufev);此方法返回传输端的另一个bufferevent。
手动加锁或解锁bufferevent
有时候,你需要确保bufferevent的操作是原子的。Libevent提供了以下的方法供你去手动加锁和解锁一个bufferevent。
接口
void bufferevent_lock(struct bufferevent *bufev); void bufferevent_unlock(struct bufferevent *bufev);注:对一个在创建时没有给BEV_OPT_THREADSAFE标识的bufferevent加锁是不起作用的,或Libevent没有使用多线程支持。
对bufferevent加锁也会锁住它关联的evbuffer。这些方法是可重入的:如果你已经持有了锁,你还可以再次对bufferevent加锁。当然你需要为每次的加锁都进行一次解锁。
libevent学习七