首页 > 代码库 > Memcached 源码分析--命令流程分析

Memcached 源码分析--命令流程分析

一、执行命令

首先是启动memcached 自带参数如下:

<span style="font-size:18px;">-p <num>      设置TCP端口号(默认设置为: 11211)
-U <num>      UDP监听端口(默认: 11211, 0 时关闭) 
-l <ip_addr>  绑定地址(默认:所有都允许,无论内外网或者本机更换IP,有安全隐患,若设置为127.0.0.1就只能本机访问)
-c <num>      max simultaneous connections (default: 1024)
-d            以daemon方式运行
-u <username> 绑定使用指定用于运行进程<username>
-m <num>      允许最大内存用量,单位M (默认: 64 MB)
-P <file>     将PID写入文件<file>,这样可以使得后边进行快速进程终止, 需要与-d 一起使用</span>

#$: ./usr/local/bin/memcached -d -u root -l 192.168.10.156 -m 2048 -p 12121


客户端通过网络方式连接:

telnet 192.168.10.156 12121

然后就可以操作命令、常见命令如下:

<span style="font-size:18px;">set
add
replace
get
delete</span>

格式如下:

<span style="font-size:18px;">command <key> <flags> <expiration time> <bytes>
<value>

参数说明如下:
command set/add/replace
key     key 用于查找缓存值
flags     可以包括键值对的整型参数,客户机使用它存储关于键值对的额外信息
expiration time     在缓存中保存键值对的时间长度(以秒为单位,0 表示永远)
bytes     在缓存中存储的字节点
value     存储的值(始终位于第二行)</span>


二、命令执行流程代码分析

首先看一下工作线程中的命令数据结构:

/**
 * The structure representing a connection into memcached.
 */
typedef struct conn conn;


非常重要的几个参数:
char * rbuf:用于存储客户端数据报文中的命令。
int rsize:rbuf的大小。
char * rcurr:未解析的命令的字符指针。
int rbytes:为解析的命令的长度。


结构如下:

<span style="font-size:18px;">struct conn {
    int    sfd;
   	char   *rbuf;   /** buffer to read commands into */  
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */  
    int    rsize;   /** total allocated size of rbuf */  
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */  
	
	/* data for the mwrite state */  
    struct iovec *iov;  
    int    iovsize;   /* number of elements allocated in iov[] */  
    int    iovused;   /* number of elements used in iov[] */  
  
    struct msghdr *msglist;  
    int    msgsize;   /* number of elements allocated in msglist[] */  
    int    msgused;   /* number of elements used in msglist[] */  
    int    msgcurr;   /* element in msglist[] being transmitted now */  
    int    msgbytes;  /* number of bytes in current msg */      
    LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */  
	...
};</span>


技术分享

状态机迁移: drive_machine(conn *c)

技术分享


以上图相当有水平,引用作者 http://calixwu.com/ 上的、自已就不再画了。


以文字说明一下整体状态机流程:
1. 当客户端和Memcached建立TCP连接后,Memcached会基于Libevent的event事件来监听客户端新的连接及是否有可读的数据。
2. 当客户端有命令数据报文上报的时候,就会触发drive_machine方法中的conn_read这个case状态。
3. memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中。
4. conn_parse_cmd主要的工作就是用来解析命令。主要通过try_read_command这个方法来读取c->rbuf中的命令数据,通过\n来分隔数据报文的命令。如果c->buf内存块中的数据匹配不到\n,则返回继续等待客户端的命令数据报文到来conn_waiting;否则就会转交给process_command方法,来处理具体的命令(命令解析会通过\0符号来分隔)。
5. process_command主要用来处理具体的命令。其中tokenize_command这个方法非常重要,将命令拆解成多个元素(KEY的最大长度250)。例如我们以get命令为例,最终会跳转到process_get_command这个命令process_*_command这一系列就是处理具体的命令逻辑的。
6. 我们进入process_get_command,当获取数据处理完毕之后,会转交到conn_mwrite这个状态。如果获取数据失败,则关闭连接。
7. 进入conn_mwrite后,主要是通过transmit方法来向客户端提交数据。如果写数据失败,则关闭连接或退出drive_machine循环;如果写入成功,则又转交到conn_new_cmd这个状态。
8. conn_new_cmd这个状态主要是处理c->rbuf中剩余的命令。主要看一下reset_cmd_handler这个方法,这个方法回去判断c->rbytes中是否还有剩余的报文没处理,如果未处理,则转交到conn_parse_cmd(第四步)继续解析剩余命令;如果已经处理了,则转交到conn_waiting,等待新的事件到来。在转交之前,每次都会执行一次conn_shrink方法。
9. conn_shrink方法主要用来处理命令报文容器c->rbuf和输出内容的容器是否数据满了?是否需要扩大buffer的大小,是否需要移动内存块。接受命令报文的初始化内存块大小2048,最大8192。


三、下面以代码简要分析一下
1、读写事件回调函数:event_handler,这个方法中最终调用的是drive_machine

void event_handler(const int fd, const short which, void *arg) {
	conn* c = (conn *) arg;
	drive_machine(c); 
}

drive_machine:
drive_machine这个方法中,都是通过c->state来判断需要处理的逻辑。
conn_listening:监听状态
conn_waiting:等待状态
conn_read:读取状态
conn_parse_cmd:命令行解析
conn_mwrite:向客户端写数据
conn_new_cmd:解析新的命令
//

static void drive_machine(conn *c) { 
	bool stop = false;
	while(!stop) {
		switch (c->state) {
			case conn_waiting:
			// 通过update_event函数确认是否为读状态,如果是则切到conn_read
			if (!update_event(c, EV_READ | EV_PERSIST)) {
				conn_set_state(c, conn_closing);
			}
			conn_set_state(c, conn_read);
            stop = true;
            break;
            
           case conn_read:
           	// 读取数据并根据read的情况切到不同状态、正常情况切到conn_parse_cmd
           	res = try_read_network(c);
           	switch (res) {
            case READ_NO_DATA_RECEIVED:
                conn_set_state(c, conn_waiting);
                break;
            case READ_DATA_RECEIVED:
                conn_set_state(c, conn_parse_cmd);
                break;
            case READ_ERROR:
                conn_set_state(c, conn_closing);
                break;
            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
                /* State already set by try_read_network */
                break;
            }
            break; 
            
            case conn_parse_cmd:
            // 读取命令并解析命令,如果数据不够则切到conn_waiting
            if (try_read_command(c) == 0) {  
            	/* we need more data! */  
            	conn_set_state(c, conn_waiting);  
        	}
            break;
            
            case conn_mwrite:
            res = transmit(c);
            switch(res){
            	case TRANSMIT_COMPLETE:
            	if (c->state == conn_mwrite) {
            		/* XXX:  I don't know why this wasn't the general case */
                    if(c->protocol == binary_prot) {
                        conn_set_state(c, c->write_and_go);
                    } else {
                    	// 命令回复完成后、又切换到conn_new_cmd处理剩余的命令参数
                        conn_set_state(c, conn_new_cmd);
                    }
            	}
            }
            break;
            
            ...
		}
	}
}

上面的逻辑主要反映了状态机的转换流程,下面重点看下数据处理这一块:
命令格式:set username zhuli\r\n get username \n
通过\n这个换行符来分隔数据报文中的命令。因为数据报文会有粘包和拆包的特性,所以只有等到命令行完整
才能进行解析。所有只有匹配到了\n符号,才能匹配一个完整的命令。

static int try_read_command(conn *c) { 
	if (c->protocol == binary_prot) { // 二进制模式
		dispatch_bin_command(c);
	}else{
		//查找命令中是否有\n,memcache的命令通过\n来分割
		el = memchr(c->rcurr, '\n', c->rbytes);
		
		//如果找到了\n,说明c->rcurr中有完整的命令了  
        cont = el + 1; //下一个命令开始的指针节点  
        //这边判断是否是\r\n,如果是\r\n,则el往前移一位  
        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {  
            el--;  
        }  
        //然后将命令的最后一个字符用 \0(字符串结束符号)来分隔  
        *el = '\0';  
        
        //处理命令,c->rcurr就是命令  
        process_command(c, c->rcurr);
        
        //移动到下一个命令的指针节点	
		c->rbytes -= (cont - c->rcurr);
		c->rcurr = cont;
	}
}

// 处理具体的命令。将命令分解后,分发到不同的具体操作中去
static void process_command(conn *c, char *command) {
	token_t tokens[MAX_TOKENS];
	// 拆分命令:将拆分出来的命令元素放进tokens的数组中
	ntokens = tokenize_command(command, tokens, MAX_TOKENS);
	
	// 分解出来的命令的第一个参数为操作方法
	1、process_get_command(c, tokens, ntokens, false); // "get"/"bget"
	
	2、process_update_command(c, tokens, ntokens, comm, false); // "add"/"set"/...
	
	3、process_get_command(c, tokens, ntokens, true); // "gets"
	
	...>> 4-n 
}

这里以 get 命令走读下:

static inline void process_get_command(conn *c, token_t *tokens...){
	it = item_get(key, nkey, c); // 内存存储快块取数据 
	if (it) { // 获取到了数据
        /*
         * Construct the response. Each hit adds three elements to the
         * outgoing data list:
         *   "VALUE "
         *   key
         *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
         */
		// 构建初始化返回出去的数据结构
		add_iov(c, "VALUE ", 6);
		add_iov(c, ITEM_key(it), it->nkey);
		add_iov(c, ITEM_suffix(it), it->nsuffix - 2);
		add_iov(c, suffix, suffix_len);
		add_iov(c, "END\r\n", 5);
		
		// 最后切到 conn_mwrite 即调用 transmit 函数
		conn_set_state(c, conn_mwrite);
	}
}
/*
 * Returns an item if it hasn't been marked as expired,
 * lazy-expiring as needed.
 */
item *item_get(const char *key, const size_t nkey, conn *c) {
    item *it;
    uint32_t hv;
    hv = hash(key, nkey);
    item_lock(hv);
    it = do_item_get(key, nkey, hv, c);
    item_unlock(hv);
    return it;
}

// 向客户端写数据。写完数据后,如果写失败,则关闭连接;如果写成功,则会将状态修改成conn_new_cmd,
// 继续解析c->rbuf中剩余的命令
static enum transmit_result transmit(conn *c) {
	//msghdr 发送数据的结构  
    struct msghdr *m = &c->msglist[c->msgcurr];  
    
    //sendmsg 发送数据方法  
    res = sendmsg(c->sfd, m, 0); 

	...
}

对于剩余命令的处理:

//重新设置命令handler  
static void reset_cmd_handler(conn *c) {  
    c->cmd = -1;  
    c->substate = bin_no_state;  
    if (c->item != NULL) {  
        item_remove(c->item);  
        c->item = NULL;  
    }  
    conn_shrink(c); //这个方法是检查c->rbuf容器的大小  
    //如果剩余未解析的命令 > 0的话,继续跳转到conn_parse_cmd解析命令  
    if (c->rbytes > 0) {  
        conn_set_state(c, conn_parse_cmd);  
    } else {  
        //如果命令都解析完成了,则继续等待新的数据到来  
        conn_set_state(c, conn_waiting);  
    }  
}

/*
 * Shrinks a connection's buffers if they're too big.  This prevents
 * periodic large "get" requests from permanently chewing lots of server
 * memory.
 *
 * This should only be called in between requests since it can wipe output
 * buffers!
 */  
static void conn_shrink(conn *c) { // 检查rbuf的大小
    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
        char *newbuf;

        if (c->rcurr != c->rbuf)
            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);

        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
        if (newbuf) {
            c->rbuf = newbuf;
            c->rsize = DATA_BUFFER_SIZE;
        }
        c->rcurr = c->rbuf;
    }	
	...
}

对于异步套接字编译就是 回调+状态机、一定要记下所有的状态。有几点要特别注意:
1、注册的事件处理函数不能堵塞或主动sleep、否则整个工作线程处于挂起状态。
2、单线程、但其内在的复杂性——将线性思维分解成一堆回调的负担(breaking up linear thought into a bucketload of callbacks)——仍然存在
3、对于每个事件的处理都需要维护一个状态、上下文是紧密相关的、代码编写时需要时刻注意小心。
4、注意epoll的工作模式:LT还是ET模式、一般是回调时尽量处理更多的数据包。


Memcached 源码分析--命令流程分析