首页 > 代码库 > memcached源码阅读----使用libevent和多线程模型
memcached源码阅读----使用libevent和多线程模型
本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情。
一、iblievent的使用
首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型。因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调。
可以减掉了解一下 libevent基本api调用
struct event_base *base; base = event_base_new();//初始化libevent
event_base_new对比epoll,可以理解为epoll里的epoll_create。
event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。
其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:
struct event *listener_event; listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
参数说明:
base:event_base类型,event_base_new的返回值
listener:监听的fd,listen的fd
EV_READ|EV_PERSIST:事件的类型及属性
do_accept:绑定的回调函数
(void*)base:给回调函数的参数
event_add(listener_event, NULL);
对比epoll:
event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。
event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。
注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)
EV_TIMEOUT: 超时
EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
EV_SIGNAL: POSIX信号量
EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式
事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。
有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:
1. 创建socket,bind,listen,设置为非阻塞模式
2. 创建一个event_base,即
- struct event_base * event_base_new(void)
3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。即
- struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
4. 启用该事件,即
- int event_add(struct event *ev, const struct timeval *tv)
5. 进入事件循环,即
- int event_base_dispatch(struct event_base *event_base)
/* initialize main thread libevent instance */ main_base = event_init();
最后会调用
/* enter the event loop */ if (event_base_loop(main_base, 0) != 0) { retval = EXIT_FAILURE; }
在该对象内部循环。不退出。
static void conn_init(void) { freetotal = 200; freecurr = 0; if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) { fprintf(stderr, "Failed to allocate connection structures\n"); } return; }
/* * Returns a connection from the freelist, if any. */ conn *conn_from_freelist() { conn *c; pthread_mutex_lock(&conn_lock); if (freecurr > 0) { c = freeconns[--freecurr]; } else { c = NULL; } pthread_mutex_unlock(&conn_lock); return c; }
typedef struct conn conn; struct conn { int sfd; sasl_conn_t *sasl_conn; enum conn_states state; enum bin_substates substate; struct event event; short ev_flags; short which; /** which events were just triggered */ char *rbuf; /** buffer to read commands into */ char *rcurr; /** but if we parsed some already, this is where we stopped */ int rsize; /** total allocated size of rbuf */ int rbytes; /** how much data, starting from rcur, do we have unparsed */ char *wbuf; char *wcurr; int wsize; int wbytes; /** which state to go into after finishing current write */ enum conn_states write_and_go; void *write_and_free; /** free this memory after finishing writing */ char *ritem; /** when we read in an item's value, it goes here */ int rlbytes; /* data for the nread state */ /** * item is used to hold an item structure created after reading the command * line of set/add/replace commands, but before we finished reading the actual * data. The data is read into ITEM_data(item) to avoid extra copying. */ void *item; /* for commands set/add/replace */ /* data for the swallow state */ int sbytes; /* how many bytes to swallow */ /* data for the mwrite state */ struct iovec *iov; int iovsize; /* number of elements allocated in iov[] */ int iovused; /* number of elements used in iov[] */ struct msghdr *msglist; int msgsize; /* number of elements allocated in msglist[] */ int msgused; /* number of elements used in msglist[] */ int msgcurr; /* element in msglist[] being transmitted now */ int msgbytes; /* number of bytes in current msg */ item **ilist; /* list of items to write out */ int isize; item **icurr; int ileft; char **suffixlist; int suffixsize; char **suffixcurr; int suffixleft; enum protocol protocol; /* which protocol this con<pre name="code" class="cpp"> if (sigignore(SIGPIPE) == -1) { perror("failed to ignore SIGPIPE; sigaction"); exit(EX_OSERR); }
nection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr request_addr; /* Who sent the most recent request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers‘ worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer; size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */};
if (sigignore(SIGPIPE) == -1) { perror("failed to ignore SIGPIPE; sigaction"); exit(EX_OSERR); }
初始化多线程模型,并且每个线程一个iblievent的事件模型就是调用event_init函数。
/* start up worker threads if MT mode */ thread_init(settings.num_threads, main_base);
内部实现不详细。主要是调用pthread_create函数。
if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); }
static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file)
因为,一个主机可能会有多个网卡,比如双线机房,联通或者电信,因此内部实现会出现以下代码:
for (next= ai; next; next= next->ai_next) { conn *listen_conn_add; if ((sfd = new_socket(next)) == -1) { /* getaddrinfo can return "junk" addresses, * we make sure at least one works before erroring. */ if (errno == EMFILE) { /* ...unless we're out of fds */ perror("server_socket"); exit(EX_OSERR); } continue; }
而
static int new_socket(struct addrinfo *ai)
if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } listen_conn_add->next = listen_conn; listen_conn = listen_conn_add;
static conn *listen_conn = NULL;作为全局的静态的变量。无头结点的单链表
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base) { conn *c = conn_from_freelist();
该函数主要是做了哪些动作呢?
event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); c->ev_flags = event_flags; if (event_add(&c->event, 0) == -1) { if (conn_add_to_freelist(c)) { conn_free(c); } perror("event_add"); return NULL; }
这一步就是,讲sfd上的事件绑定event_handler 函数,就是当有该连接上来的时候有数据进行可读的时候绑定,回调。
<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">最终event_handler函数会调用</span>
static void drive_machine(conn *c)函数。那么这个函数做了哪些工作呢?
while (!stop) { switch(c->state) { case conn_listening: addrlen = sizeof(addr); if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1)
<pre name="code" class="cpp">/* * Dispatches a new connection to another thread. This is only ever called * from the main thread, either during initialization (for UDP) or because * of an incoming connection. */ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); char buf[1]; int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } }
/* Listen for notifications from other threads */ event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); }
static void thread_libevent_process(int fd, short which, void *arg)
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base)
同样,调用
conn *c = conn_from_freelist();
取出一个conn* ,然后进行初始化,这个时候和上文讲到的一样了,知识状态不同了,
enum conn_states { conn_listening, /**< the socket which listens for connections */ conn_new_cmd, /**< Prepare connection for next command */ conn_waiting, /**< waiting for a readable socket */ conn_read, /**< reading in a command line */ conn_parse_cmd, /**< try to parse a command from the input buffer */ conn_write, /**< writing out a simple response */ conn_nread, /**< reading in a fixed number of bytes */ conn_swallow, /**< swallowing unnecessary bytes w/o storing */ conn_closing, /**< closing this connection */ conn_mwrite, /**< writing out many items sequentially */ conn_max_state /**< Max state value (used for assertion) */ };
也就是
static void drive_machine(conn *c)的核心逻辑了。通过设置状态,然后调用不同的代码,
/* * Sets a connection's current state in the state machine. Any special * processing that needs to happen on certain state transitions can * happen here. */ static void conn_set_state(conn *c, enum conn_states state) { assert(c != NULL); assert(state >= conn_listening && state < conn_max_state); if (state != c->state) { if (settings.verbose > 2) { fprintf(stderr, "%d: going from %s to %s\n", c->sfd, state_text(c->state), state_text(state)); } if (state == conn_write || state == conn_mwrite) { MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes); } c->state = state; } }
到此,网络框架部分已经基本处理完成。起始这个框架是非常简单而且实用的。
memcached源码阅读----使用libevent和多线程模型