首页 > 代码库 > libevent学习七

libevent学习七

Bufferevents:概念和基础

很多时候,一个程序需要处理一些数据的缓存,不止应用在答复event上。例如:当我们需要去写出数据,通常会这样做:

1. 发现有数据需要写出到一条连接上;把这些数据放到buffer里。

2. 等连接变成可写的状态。

3. 尽可能的写入数据。

4. 记住我们写了多少数据,然后如果数据没有全部写完,就等连接再次变为可写的状态。

这种IO缓冲模式已经足够Libevent的日常使用。一个“bufferevent”是由一个底层传输渠道(如socket),一个读buffer,一个写buffer。对于常规的event

,当传输渠道准备好读或写时,它会直接调用它的回调。而bufferevent则是当底层有足够的数据可读或可写时才去调用回调。

这里有多种bufferevent类型,它们共享同一套接口。它们包括:

1. socket-based bufferevents:从底层流socket发送和接收数据,使用event_*接口。

2. asynchronout-IO bufferevents:使用Windows IOCP接口去发送和接收数据(只有Windows支持)。

3. filtering bufferevents:在数据被传给传输层前,先处理一下数据-例如:压缩和解压数据。

4. paired bufferevents:两个bufferevent之间进行交互数据。

注:bufferevent目前只支持像TCP一样的流协议。在将来可能会支持数据报文协议,像UDP。


Bufferevent和evbuffers

每个bufferevent都有一个input buffer和一个output buffer。buffer的类型是“struct evbuffer”。当你有数据需要写出到bufferevent,你要把这些数据添加到output buffer;当bufferevent有数据给你去读时,你实际上要从input buffer中读取它们。


回调和水标位

每个bufferevent有2个数据相关的回调:读回调和写回调。默认情况下,只要有数据从传输层被读出来,都会调用读回调;只要传输层有足够的空间,写回调就会被调用。你可以通过调整读写“水标位”重写这些默认的行为。

每个bufferevent有4个水标位:

1. 低读水标位:只要bufferevent的input buffer达到或超过了这个标准,读回调就会被调用。默认是0,所以每个读都会触发读回调。

2. 高读水标位:如果bufferevent的input buffer达到了这个标准,bufferevent将会停止读,直到有数据被从input buffer中被取走,使它低于这个标准。默认是没限制的,所以实际上我们从不会停止读。

3. 低写水标位:只要达到了这个标准,就会触发写回调。默认是0, 所以只有在output buffer是空的情况下,写回调才不会被触发。

4. 高写水标位:并不会被bufferevent直接使用,当bufferevent被用于与另一个bufferevent做交互时,这个水标位会被赋予特殊的意义。

一个bufferevent也有一个“错误”或“事件”回调,它是用来告知程序一些非数据的事件,例如有一条连接被关闭或发生了错误。下面这些event flag被定义:

1. BEV_EVENT_READING:在读操作期间发生的事件。

2. BEV_EVENT_WRITING:在写操作期间发生的事件。

3. BEV_EVENT_ERROR:在bufferevent操作期间发生的错误。可以通过EVUTIL_SOCKET_ERROR()获取更具体的错误信息。

4. BEV_EVENT_TIMEOUT:bufferevent上发生的超时。

5. BEV_EVENT_EOF:到达了文件尾。

6. BEV_EVENT_CONNECTED:连接建立。


递延回调

默认情况下,当相应的条件达成时,bufferevent会直接执行回调。当依赖关系变的复杂时,这种直接调用会产生一些问题。例如:假设有一个回调是当evbuffer A变空时把数据移进去,另一个回调是当evbuffer A变满时把数据移出来。一旦这些调用同时出现在栈上,你可能面临着栈溢出的风险。

为了解决这个问题,bufferevent(或evbuffer)的回调应该被递延。当一个递延回调的条件达成时,不会直接调用回调,而是把它扔到event_loop()的列表中,在常规的event回调后再被调用。


bufferevent的操作标识

你可以使用下面这些标识去控制bufferevent的一些行为,当它被创建时:

BEV_OPT_CLOSE_ON_FREE:当bufferevent被释放时,关闭底层传输渠道。

BEV_OPT_THREADSAFE:自动为bufferevent分配锁,所以应用在多线程中它是安全的。

BEV_OPT_DEFER_CALLBACKS:当这个标识被设置时,bufferevent递延所有的回调。

BEV_OPT_UNLOCK_CALLBACKS:默认情况下,当bufferevent被设置为线程安全的,bufferevent的锁在回调被调用时就会被持有。这个标识使Libevent释放bufferevent的锁,当调用你的回调时。


socket-based bufferevents

socket-based类型是使用起来最简单的bufferevent。它是通过使用Libevent的底层事件触发机制来检测socket是否准备好读或写操作,通过系统调用(如readv, writev, WSASend, WSARecv)去传输和接收数据。

socket-based bufferevent的创建

你可以通过bufferevent_socket_new()去创建一个socket-based bufferevent:

接口

struct bufferevent *bufferevent_socket_new(
    struct event_base *base,
    evutil_socket_t fd,
    enum bufferevent_options options);
base是event_base,options是bufferevent的操作标识的按位与(BEV_OPT_CLOSE_ON_FREE,等)。fd是可选的,它表示的是socket的文件描述符。你可以先把它设置成-1,然后在后边需要的时候再设置它。
提示:确保你提供的socket是非阻塞模式的。可以通过evutil_make_socket_nonblocking方法去设置。

此方法成功返回一个指向bufferevent的指针,失败返回NULL。

发起连接

如果bufferevent的socket还没连接,你可以发起一个新的连接。

接口

int bufferevent_socket_connect(struct bufferevent *bev,
    struct sockaddr *address, int addrlen);
addressaddrlen参数是用于标准系统调用connect()的。如果bufferevent还没有socket,通过这个方法可以分配一个新的流模式的socket,并且把它设置为非阻塞。如果bufferevent还没有socket,bufferevent_socket_connect()会告诉Libevent socket还没有连接,直到连接被成功建立之前,不应该有读或写发生在socket上。

在连接建立之前,往output buffer中添加数据时没问题的。
此方法连接成功时返回0, 有错误发生返回-1。

例:

#include <event2/event.h>
#include <event2/bufferevent.h>
#include <sys/socket.h>
#include <string.h>

void eventcb(struct bufferevent *bev, short events, void *ptr)
{
    if (events & BEV_EVENT_CONNECTED) {
         /* We're connected to 127.0.0.1:8080.   Ordinarily we'd do
            something here, like start reading or writing. */
    } else if (events & BEV_EVENT_ERROR) {
         /* An error occured while connecting. */
    }
}

int main_loop(void)
{
    struct event_base *base;
    struct bufferevent *bev;
    struct sockaddr_in sin;

    base = event_base_new();

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
    sin.sin_port = htons(8080); /* Port 8080 */

    bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);

    bufferevent_setcb(bev, NULL, NULL, eventcb, NULL);

    if (bufferevent_socket_connect(bev,
        (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        /* Error starting connection */
        bufferevent_free(bev);
        return -1;
    }

    event_base_dispatch(base);
    return 0;
}
注:如果你使用bufferevent_socket_connect()发起一个connect,你会得到一个BEV_EVENT_CONNECTED事件。如果你自己发起connect,连接成功会触发写事件(失败会可读又可写哦,一般情况下我会线程去实现。详见)。
如果你想调用自己的connect,但当连接成功时仍想接收到一个BEV_EVENT_CONNECTED事件,你可以在connect()返回-1(错误码是EAGAIN或EINPROGRESS)后调用bufferevent_socket_connect(bev, NULL,0)。

通过域名发起连接

很多时候,你想把域名解析和创建连接放到一个操作中。这里有个接口可以实现:

接口

int bufferevent_socket_connect_hostname(struct bufferevent *bev,
    struct evdns_base *dns_base, int family, const char *hostname,
    int port);
int bufferevent_socket_get_dns_error(struct bufferevent *bev);
这个方法解析DNS名字hostname,查找它的family格式的地址(可用的类型有AF_INET,AF_INET6和AF_UNSPEC.)。如果名字解析失败,它会调用event回调,并传入一个错误事件。如果成功,它接下来尝试去通过bufferevent_connect发起一个连接。
dns_base参数是可选的。如果它为NULL,那么Libevent会阻塞等待名字解析结束,通常这不是你想要的结果。如果它不为NULL,Libevent使用它异步查找域名。

跟bufferevent_socket_connect()一样,如果连接未成功建立,不应该有读或写时间发生在这条socket上,直到解析结束并且连接成功建立。

如果有错误发生,它可能是DNS域名解析的错误。你可以通过bufferevent_socket_get_dns_error()找出大多数的错误。如果返回的错误码是0,说明没有DNS错误发生。

例:简单的HTTP v0客户端

/* Don't actually copy this code: it is a poor way to implement an
   HTTP client.  Have a look at evhttp instead.
*/
#include <event2/dns.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <event2/event.h>

#include <stdio.h>

void readcb(struct bufferevent *bev, void *ptr)
{
    char buf[1024];
    int n;
    struct evbuffer *input = bufferevent_get_input(bev);
    while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
        fwrite(buf, 1, n, stdout);
    }
}

void eventcb(struct bufferevent *bev, short events, void *ptr)
{
    if (events & BEV_EVENT_CONNECTED) {
         printf("Connect okay.\n");
    } else if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) {
         struct event_base *base = ptr;
         if (events & BEV_EVENT_ERROR) {
                 int err = bufferevent_socket_get_dns_error(bev);
                 if (err)
                         printf("DNS error: %s\n", evutil_gai_strerror(err));
         }
         printf("Closing\n");
         bufferevent_free(bev);
         event_base_loopexit(base, NULL);
    }
}

int main(int argc, char **argv)
{
    struct event_base *base;
    struct evdns_base *dns_base;
    struct bufferevent *bev;

    if (argc != 3) {
        printf("Trivial HTTP 0.x client\n"
               "Syntax: %s [hostname] [resource]\n"
               "Example: %s www.google.com /\n",argv[0],argv[0]);
        return 1;
    }

    base = event_base_new();
    dns_base = evdns_base_new(base, 1);

    bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, readcb, NULL, eventcb, base);
    bufferevent_enable(bev, EV_READ|EV_WRITE);
    evbuffer_add_printf(bufferevent_get_output(bev), "GET %s\r\n", argv[2]);
    bufferevent_socket_connect_hostname(
        bev, dns_base, AF_UNSPEC, argv[1], 80);
    event_base_dispatch(base);
    return 0;
}

通用的bufferevent操作

释放bufferevent

接口

void bufferevent_free(struct bufferevent *bev);
这个方法用来释放bufferevent。bufferevent是有内部引用计数的,所以如果在你释放它的时候,它还有一个延迟回调,它不会被直接删除,而是会等待回调结束才进行。
bufferevent_free()方法会尽可能的去释放bufferevent。如果当前有数据正准备写,这部分数据可能会被丢弃。

如果BEV_OPT_CLOSE_ON_FREE标识被设置,当bufferevent被释放时,bufferevent所关联的socket或其他传输渠道会被自动关闭。

回调,水标位和开启选项

接口

typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
typedef void (*bufferevent_event_cb)(struct bufferevent *bev,
    short events, void *ctx);

void bufferevent_setcb(struct bufferevent *bufev,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg);

void bufferevent_getcb(struct bufferevent *bufev,
    bufferevent_data_cb *readcb_ptr,
    bufferevent_data_cb *writecb_ptr,
    bufferevent_event_cb *eventcb_ptr,
    void **cbarg_ptr);
bufferevent_setcb()方法设置bufferevent的回调。readcb,writecb和eventcb方法会在有读,写,错误发生时被调用。它们的第一个参数是发生事件的bufferevent。最后一个参数是用户在bufferevent_setcb中提供的cbarg参数。bufferevent_event_cb的event参数是event标识的按位与:看上边的回调和水标位章节。
你可以通过传入NULL来禁止某个回调。注意:所有的回调共享同一个cbarg参数值,所以改变它可能会影响所有的回调。

你可以通过bufferevent_getcb()获取bufferevent的当前的回调函数。

接口

void bufferevent_enable(struct bufferevent *bufev, short events);
void bufferevent_disable(struct bufferevent *bufev, short events);

short bufferevent_get_enabled(struct bufferevent *bufev);
你可以开启或关闭bufferevent上的EV_READ,EV_WRITE,或EV_READ|EV_WRITE。当读或写被禁止时,bufferevent不会去读或写数据。

当output buffer为空时,不需要关闭写:bufferevent会自动停止写,当有数据可以写时,它会自动启用写功能。

相应的,当input buffer达到高水标位时,不需要关闭读:bufferevent会自动停止读,当有空间可以继续读的时候,它会自动启用读功能。
默认情况下,新创建的bufferevent有读功能,没有写功能(可能连接还没建立)。

你可以通过bufferevent_get_enabled()去看哪个events是开启状态的。

接口

void bufferevent_setwatermark(struct bufferevent *bufev, short events,
    size_t lowmark, size_t highmark);
此方法用来调整读写水标位(如果EV_READ被设置,则是调整读水标位。如果EV_WRITE被设置,则是调整写水标位)。

高水标位如果是0,则表示“无限制”。

例:

#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>

#include <stdlib.h>
#include <errno.h>
#include <string.h>

struct info {
    const char *name;
    size_t total_drained;
};

void read_callback(struct bufferevent *bev, void *ctx)
{
    struct info *inf = ctx;
    struct evbuffer *input = bufferevent_get_input(bev);
    size_t len = evbuffer_get_length(input);
    if (len) {
        inf->total_drained += len;
        evbuffer_drain(input, len);
        printf("Drained %lu bytes from %s\n",
             (unsigned long) len, inf->name);
    }
}

void event_callback(struct bufferevent *bev, short events, void *ctx)
{
    struct info *inf = ctx;
    struct evbuffer *input = bufferevent_get_input(bev);
    int finished = 0;

    if (events & BEV_EVENT_EOF) {
        size_t len = evbuffer_get_length(input);
        printf("Got a close from %s.  We drained %lu bytes from it, "
            "and have %lu left.\n", inf->name,
            (unsigned long)inf->total_drained, (unsigned long)len);
        finished = 1;
    }
    if (events & BEV_EVENT_ERROR) {
        printf("Got an error from %s: %s\n",
            inf->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
        finished = 1;
    }
    if (finished) {
        free(ctx);
        bufferevent_free(bev);
    }
}

struct bufferevent *setup_bufferevent(void)
{
    struct bufferevent *b1 = NULL;
    struct info *info1;

    info1 = malloc(sizeof(struct info));
    info1->name = "buffer 1";
    info1->total_drained = 0;

    /* ... Here we should set up the bufferevent and make sure it gets
       connected... */

    /* Trigger the read callback only whenever there is at least 128 bytes
       of data in the buffer. */
    bufferevent_setwatermark(b1, EV_READ, 128, 0);

    bufferevent_setcb(b1, read_callback, NULL, event_callback, info1);

    bufferevent_enable(b1, EV_READ); /* Start reading. */
    return b1;
}
操作bufferevent的数据
bufferevent提供给你一些方法,可以操作读和写的数据。

接口

struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);
这两个方法非常强大的根本是因为:它们分别返回input和output buffer。

注:你只能从input buffer中取数据,只能向output buffer中添加数据。

接口

int bufferevent_write(struct bufferevent *bufev,
    const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev,
    struct evbuffer *buf);
这两个方法是用来向output buffer添加数据。它们成功返回0,失败返回-1。

接口

size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev,
    struct evbuffer *buf);
这两个方法是用来从input buffer中取数据。它们成功返回0,失败返回-1。
注:bufferevent_read()方法中,data所指向的内存块,必须有足够的空间去装size这么多的数据。

例:

#include <event2/bufferevent.h>
#include <event2/buffer.h>

#include <ctype.h>

void
read_callback_uppercase(struct bufferevent *bev, void *ctx)
{
        /* This callback removes the data from bev's input buffer 128
           bytes at a time, uppercases it, and starts sending it
           back.

           (Watch out!  In practice, you shouldn't use toupper to implement
           a network protocol, unless you know for a fact that the current
           locale is the one you want to be using.)
         */

        char tmp[128];
        size_t n;
        int i;
        while (1) {
                n = bufferevent_read(bev, tmp, sizeof(tmp));
                if (n <= 0)
                        break; /* No more data. */
                for (i=0; i<n; ++i)
                        tmp[i] = toupper(tmp[i]);
                bufferevent_write(bev, tmp, n);
        }
}

struct proxy_info {
        struct bufferevent *other_bev;
};
void
read_callback_proxy(struct bufferevent *bev, void *ctx)
{
        /* You might use a function like this if you're implementing
           a simple proxy: it will take data from one connection (on
           bev), and write it to another, copying as little as
           possible. */
        struct proxy_info *inf = ctx;

        bufferevent_read_buffer(bev,
            bufferevent_get_output(inf->other_bev));
}

struct count {
        unsigned long last_fib[2];
};

void
write_callback_fibonacci(struct bufferevent *bev, void *ctx)
{
        /* Here's a callback that adds some Fibonacci numbers to the
           output buffer of bev.  It stops once we have added 1k of
           data; once this data is drained, we'll add more. */
        struct count *c = ctx;

        struct evbuffer *tmp = evbuffer_new();
        while (evbuffer_get_length(tmp) < 1024) {
                 unsigned long next = c->last_fib[0] + c->last_fib[1];
                 c->last_fib[0] = c->last_fib[1];
                 c->last_fib[1] = next;

                 evbuffer_add_printf(tmp, "%lu", next);
        }

        /* Now we add the whole contents of tmp to bev. */
        bufferevent_write_buffer(bev, tmp);

        /* We don't need tmp any longer. */
        evbuffer_free(tmp);
}
读和写的超时
与其他时间类似,当规定时间已到,但是仍没有数据被写出或没有数据被读取到,此时超时事件会发生。

接口

void bufferevent_set_timeouts(struct bufferevent *bufev,
    const struct timeval *timeout_read, const struct timeval *timeout_write);
如果设置timeout为NULL,表示移除它。

如果bufferevent在读上等待的时间超过timeout_read秒,那么读超时事件将会发生。如果bufferevent在写上等待的时间超过timeout_write秒,那么写事件将会发生。
注:超时的计时是在bufferevent启用了读或写功能时才开始。换句话说,如果读操作被禁止或input buffer达到了高水标位,读超时也是被禁止的。写超时同理。

当读或写超时发生时,相应的读或写操作将会被禁止。事件回调的event参数会为BEV_EVENT_TIMEOUT|BEV_EVENT_READING或BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING。

刷新数据

接口

int bufferevent_flush(struct bufferevent *bufev,
    short iotype, enum bufferevent_flush_mode state);
刷新bufferevent是告诉bufferevent尽可能的从socket上读或写更多数据。无视其他的限制。具体的功能依赖bufferevent的类型。

iotype参数应该为EV_READ, EV_WRITE, 或EV_READ|EV_WRITE。state参数可能是BEV_NORMAL,BEV_FLUSH,或BEV_FINISHED。BEV_FINISHED表示告知另一端不会再有数据会发送;BEV_NORMAL和BEV_FLUSH的区别要看bufferevent是什么类型。
bufferevent_flush()方法失败返回-1,如果没数据被刷新返回0,如果有数据被刷新返回1。


特定类型的bufferevent方法

这些方法只适应部分bufferevent类型

接口

int bufferevent_priority_set(struct bufferevent *bufev, int pri);
int bufferevent_get_priority(struct bufferevent *bufev);
这个方法通过bufev和pri去调整event的优先级。它成功返回0,失败返回-1。只对socket-based bufferevent起作用。
接口

int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);
evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);
这两个方法设置或返回文件描述符。只对socket-based bufferevent起作用。

接口

struct event_base *bufferevent_get_base(struct bufferevent *bev);
此方法返回bufferevent的event_base。

接口

struct bufferevent *bufferevent_get_underlying(struct bufferevent *bufev);
此方法返回传输端的另一个bufferevent。

手动加锁或解锁bufferevent

有时候,你需要确保bufferevent的操作是原子的。Libevent提供了以下的方法供你去手动加锁和解锁一个bufferevent。

接口

void bufferevent_lock(struct bufferevent *bufev);
void bufferevent_unlock(struct bufferevent *bufev);
注:对一个在创建时没有给BEV_OPT_THREADSAFE标识的bufferevent加锁是不起作用的,或Libevent没有使用多线程支持。

对bufferevent加锁也会锁住它关联的evbuffer。这些方法是可重入的:如果你已经持有了锁,你还可以再次对bufferevent加锁。当然你需要为每次的加锁都进行一次解锁。

libevent学习七