首页 > 代码库 > Memcached 源码分析--命令流程分析
Memcached 源码分析--命令流程分析
一、执行命令
首先是启动memcached 自带参数如下:
<span style="font-size:18px;">-p <num> 设置TCP端口号(默认设置为: 11211) -U <num> UDP监听端口(默认: 11211, 0 时关闭) -l <ip_addr> 绑定地址(默认:所有都允许,无论内外网或者本机更换IP,有安全隐患,若设置为127.0.0.1就只能本机访问) -c <num> max simultaneous connections (default: 1024) -d 以daemon方式运行 -u <username> 绑定使用指定用于运行进程<username> -m <num> 允许最大内存用量,单位M (默认: 64 MB) -P <file> 将PID写入文件<file>,这样可以使得后边进行快速进程终止, 需要与-d 一起使用</span>
#$: ./usr/local/bin/memcached -d -u root -l 192.168.10.156 -m 2048 -p 12121
客户端通过网络方式连接:
telnet 192.168.10.156 12121
然后就可以操作命令、常见命令如下:
<span style="font-size:18px;">set add replace get delete</span>
格式如下:
<span style="font-size:18px;">command <key> <flags> <expiration time> <bytes> <value> 参数说明如下: command set/add/replace key key 用于查找缓存值 flags 可以包括键值对的整型参数,客户机使用它存储关于键值对的额外信息 expiration time 在缓存中保存键值对的时间长度(以秒为单位,0 表示永远) bytes 在缓存中存储的字节点 value 存储的值(始终位于第二行)</span>
二、命令执行流程代码分析
首先看一下工作线程中的命令数据结构:
/**
* The structure representing a connection into memcached.
*/
typedef struct conn conn;
非常重要的几个参数:
char * rbuf:用于存储客户端数据报文中的命令。
int rsize:rbuf的大小。
char * rcurr:未解析的命令的字符指针。
int rbytes:为解析的命令的长度。
结构如下:
<span style="font-size:18px;">struct conn { int sfd; char *rbuf; /** buffer to read commands into */ char *rcurr; /** but if we parsed some already, this is where we stopped */ int rsize; /** total allocated size of rbuf */ int rbytes; /** how much data, starting from rcur, do we have unparsed */ /* data for the mwrite state */ struct iovec *iov; int iovsize; /* number of elements allocated in iov[] */ int iovused; /* number of elements used in iov[] */ struct msghdr *msglist; int msgsize; /* number of elements allocated in msglist[] */ int msgused; /* number of elements used in msglist[] */ int msgcurr; /* element in msglist[] being transmitted now */ int msgbytes; /* number of bytes in current msg */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ ... };</span>
状态机迁移: drive_machine(conn *c)
以上图相当有水平,引用作者 http://calixwu.com/ 上的、自已就不再画了。
以文字说明一下整体状态机流程:
1. 当客户端和Memcached建立TCP连接后,Memcached会基于Libevent的event事件来监听客户端新的连接及是否有可读的数据。
2. 当客户端有命令数据报文上报的时候,就会触发drive_machine方法中的conn_read这个case状态。
3. memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中。
4. conn_parse_cmd主要的工作就是用来解析命令。主要通过try_read_command这个方法来读取c->rbuf中的命令数据,通过\n来分隔数据报文的命令。如果c->buf内存块中的数据匹配不到\n,则返回继续等待客户端的命令数据报文到来conn_waiting;否则就会转交给process_command方法,来处理具体的命令(命令解析会通过\0符号来分隔)。
5. process_command主要用来处理具体的命令。其中tokenize_command这个方法非常重要,将命令拆解成多个元素(KEY的最大长度250)。例如我们以get命令为例,最终会跳转到process_get_command这个命令process_*_command这一系列就是处理具体的命令逻辑的。
6. 我们进入process_get_command,当获取数据处理完毕之后,会转交到conn_mwrite这个状态。如果获取数据失败,则关闭连接。
7. 进入conn_mwrite后,主要是通过transmit方法来向客户端提交数据。如果写数据失败,则关闭连接或退出drive_machine循环;如果写入成功,则又转交到conn_new_cmd这个状态。
8. conn_new_cmd这个状态主要是处理c->rbuf中剩余的命令。主要看一下reset_cmd_handler这个方法,这个方法回去判断c->rbytes中是否还有剩余的报文没处理,如果未处理,则转交到conn_parse_cmd(第四步)继续解析剩余命令;如果已经处理了,则转交到conn_waiting,等待新的事件到来。在转交之前,每次都会执行一次conn_shrink方法。
9. conn_shrink方法主要用来处理命令报文容器c->rbuf和输出内容的容器是否数据满了?是否需要扩大buffer的大小,是否需要移动内存块。接受命令报文的初始化内存块大小2048,最大8192。
三、下面以代码简要分析一下
1、读写事件回调函数:event_handler,这个方法中最终调用的是drive_machine
void event_handler(const int fd, const short which, void *arg) { conn* c = (conn *) arg; drive_machine(c); }
drive_machine:
drive_machine这个方法中,都是通过c->state来判断需要处理的逻辑。
conn_listening:监听状态
conn_waiting:等待状态
conn_read:读取状态
conn_parse_cmd:命令行解析
conn_mwrite:向客户端写数据
conn_new_cmd:解析新的命令
//
static void drive_machine(conn *c) { bool stop = false; while(!stop) { switch (c->state) { case conn_waiting: // 通过update_event函数确认是否为读状态,如果是则切到conn_read if (!update_event(c, EV_READ | EV_PERSIST)) { conn_set_state(c, conn_closing); } conn_set_state(c, conn_read); stop = true; break; case conn_read: // 读取数据并根据read的情况切到不同状态、正常情况切到conn_parse_cmd res = try_read_network(c); switch (res) { case READ_NO_DATA_RECEIVED: conn_set_state(c, conn_waiting); break; case READ_DATA_RECEIVED: conn_set_state(c, conn_parse_cmd); break; case READ_ERROR: conn_set_state(c, conn_closing); break; case READ_MEMORY_ERROR: /* Failed to allocate more memory */ /* State already set by try_read_network */ break; } break; case conn_parse_cmd: // 读取命令并解析命令,如果数据不够则切到conn_waiting if (try_read_command(c) == 0) { /* we need more data! */ conn_set_state(c, conn_waiting); } break; case conn_mwrite: res = transmit(c); switch(res){ case TRANSMIT_COMPLETE: if (c->state == conn_mwrite) { /* XXX: I don't know why this wasn't the general case */ if(c->protocol == binary_prot) { conn_set_state(c, c->write_and_go); } else { // 命令回复完成后、又切换到conn_new_cmd处理剩余的命令参数 conn_set_state(c, conn_new_cmd); } } } break; ... } } }
上面的逻辑主要反映了状态机的转换流程,下面重点看下数据处理这一块:
命令格式:set username zhuli\r\n get username \n
通过\n这个换行符来分隔数据报文中的命令。因为数据报文会有粘包和拆包的特性,所以只有等到命令行完整
才能进行解析。所有只有匹配到了\n符号,才能匹配一个完整的命令。
static int try_read_command(conn *c) { if (c->protocol == binary_prot) { // 二进制模式 dispatch_bin_command(c); }else{ //查找命令中是否有\n,memcache的命令通过\n来分割 el = memchr(c->rcurr, '\n', c->rbytes); //如果找到了\n,说明c->rcurr中有完整的命令了 cont = el + 1; //下一个命令开始的指针节点 //这边判断是否是\r\n,如果是\r\n,则el往前移一位 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { el--; } //然后将命令的最后一个字符用 \0(字符串结束符号)来分隔 *el = '\0'; //处理命令,c->rcurr就是命令 process_command(c, c->rcurr); //移动到下一个命令的指针节点 c->rbytes -= (cont - c->rcurr); c->rcurr = cont; } } // 处理具体的命令。将命令分解后,分发到不同的具体操作中去 static void process_command(conn *c, char *command) { token_t tokens[MAX_TOKENS]; // 拆分命令:将拆分出来的命令元素放进tokens的数组中 ntokens = tokenize_command(command, tokens, MAX_TOKENS); // 分解出来的命令的第一个参数为操作方法 1、process_get_command(c, tokens, ntokens, false); // "get"/"bget" 2、process_update_command(c, tokens, ntokens, comm, false); // "add"/"set"/... 3、process_get_command(c, tokens, ntokens, true); // "gets" ...>> 4-n }
这里以 get 命令走读下:
static inline void process_get_command(conn *c, token_t *tokens...){ it = item_get(key, nkey, c); // 内存存储快块取数据 if (it) { // 获取到了数据 /* * Construct the response. Each hit adds three elements to the * outgoing data list: * "VALUE " * key * " " + flags + " " + data length + "\r\n" + data (with \r\n) */ // 构建初始化返回出去的数据结构 add_iov(c, "VALUE ", 6); add_iov(c, ITEM_key(it), it->nkey); add_iov(c, ITEM_suffix(it), it->nsuffix - 2); add_iov(c, suffix, suffix_len); add_iov(c, "END\r\n", 5); // 最后切到 conn_mwrite 即调用 transmit 函数 conn_set_state(c, conn_mwrite); } } /* * Returns an item if it hasn't been marked as expired, * lazy-expiring as needed. */ item *item_get(const char *key, const size_t nkey, conn *c) { item *it; uint32_t hv; hv = hash(key, nkey); item_lock(hv); it = do_item_get(key, nkey, hv, c); item_unlock(hv); return it; } // 向客户端写数据。写完数据后,如果写失败,则关闭连接;如果写成功,则会将状态修改成conn_new_cmd, // 继续解析c->rbuf中剩余的命令 static enum transmit_result transmit(conn *c) { //msghdr 发送数据的结构 struct msghdr *m = &c->msglist[c->msgcurr]; //sendmsg 发送数据方法 res = sendmsg(c->sfd, m, 0); ... }
对于剩余命令的处理:
//重新设置命令handler static void reset_cmd_handler(conn *c) { c->cmd = -1; c->substate = bin_no_state; if (c->item != NULL) { item_remove(c->item); c->item = NULL; } conn_shrink(c); //这个方法是检查c->rbuf容器的大小 //如果剩余未解析的命令 > 0的话,继续跳转到conn_parse_cmd解析命令 if (c->rbytes > 0) { conn_set_state(c, conn_parse_cmd); } else { //如果命令都解析完成了,则继续等待新的数据到来 conn_set_state(c, conn_waiting); } } /* * Shrinks a connection's buffers if they're too big. This prevents * periodic large "get" requests from permanently chewing lots of server * memory. * * This should only be called in between requests since it can wipe output * buffers! */ static void conn_shrink(conn *c) { // 检查rbuf的大小 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { char *newbuf; if (c->rcurr != c->rbuf) memmove(c->rbuf, c->rcurr, (size_t)c->rbytes); newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE); if (newbuf) { c->rbuf = newbuf; c->rsize = DATA_BUFFER_SIZE; } c->rcurr = c->rbuf; } ... }
对于异步套接字编译就是 回调+状态机、一定要记下所有的状态。有几点要特别注意:
1、注册的事件处理函数不能堵塞或主动sleep、否则整个工作线程处于挂起状态。
2、单线程、但其内在的复杂性——将线性思维分解成一堆回调的负担(breaking up linear thought into a bucketload of callbacks)——仍然存在
3、对于每个事件的处理都需要维护一个状态、上下文是紧密相关的、代码编写时需要时刻注意小心。
4、注意epoll的工作模式:LT还是ET模式、一般是回调时尽量处理更多的数据包。
Memcached 源码分析--命令流程分析