首页 > 代码库 > memcached学习笔记——存储命令源码分析上
memcached学习笔记——存储命令源码分析上
原创文章,转载请标明,谢谢。
上一篇分析过memcached的连接模型,了解memcached是如何高效处理客户端连接,这一篇分析memcached源码中的process_update_command函数,探究memcached客户端的set命令,解读memcached是如何解析客户端文本命令,剖析memcached的内存管理,LRU算法是如何工作等等。
解析客户端文本命令
客户端向memcached server发出set操作,memcached server读取客户端的命令,客户端的连接状态由 conn_read > conn_parse_cmd 转换,这时候,memcached server开始解析命令。memcached server调用try_read_command函数解析命令,memcached接收两种格式的命令,一种是二进制格式,另一种是文本格式(本文只讲文本格式的命令)。
1 static int try_read_command(conn *c) { 2 3 // .......... 4 5 if (c->protocol == binary_prot) { 6 7 // 二进制格式 8 // .... 9 10 } else {11 char *el, *cont;12 13 // 没有接收到客户端的命令,返回进入conn_waiting状态,等待更多的客户端数据14 if (c->rbytes == 0)15 return 0;16 17 el = memchr(c->rcurr, ‘\n‘, c->rbytes);18 if (!el) {19 if (c->rbytes > 1024) {20 /*21 * We didn‘t have a ‘\n‘ in the first k. This _has_ to be a22 * large multiget, if not we should just nuke the connection.23 */24 char *ptr = c->rcurr;25 while (*ptr == ‘ ‘) { /* ignore leading whitespaces */26 ++ptr;27 }28 29 if (ptr - c->rcurr > 100 ||30 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {31 32 conn_set_state(c, conn_closing);33 return 1;34 }35 }36 37 return 0;38 }39 40 // 客户端报文以‘\r\n‘结尾41 cont = el + 1;42 if ((el - c->rcurr) > 1 && *(el - 1) == ‘\r‘) {43 el--;44 }45 *el = ‘\0‘;46 47 assert(cont <= (c->rcurr + c->rbytes));48 49 // 真正解析命令的地方50 process_command(c, c->rcurr);51 52 c->rbytes -= (cont - c->rcurr);53 c->rcurr = cont;54 55 assert(c->rcurr <= (c->rbuf + c->rsize));56 }57 58 return 1;59 }
在分析process_command函数前,我们先看看memcached的命令格式:
1 <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n 2 3 cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]\r\n 4 5 // 例如 set 命令 : 6 set key 0 60 2 7 12 8 STORED 9 10 // 空格对应着空格11 set => <command name>12 key => <key>13 0 => <flags>14 60 => <exptime>15 2 => <bytes>
memcached在process_command中调用tokenize_command函数根据上面的命令格式处理命令,把相应位置的字段保存在 token_t *tokens 的相应位置。
1 // 参数1:命令的字符串 2 // 参数2:解析命令后,存放命令各个字段的结构体数组 3 // 参数3:命令字段的最大数量 4 /* 5 * tokens[0] => <command name> 的信息 6 * tokens[1] => <key> 的信息 7 * tokens[2] => <flags> 的信息 8 */ 9 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {10 char *s, *e;11 size_t ntokens = 0;12 size_t len = strlen(command);13 unsigned int i = 0;14 15 assert(command != NULL && tokens != NULL && max_tokens > 1);16 17 s = e = command;18 for (i = 0; i < len; i++) {19 if (*e == ‘ ‘) {20 if (s != e) {21 tokens[ntokens].value = http://www.mamicode.com/s; // value存放各个字段的字符串值,例如:‘set‘22 tokens[ntokens].length = e - s; // length表示各个字段相应的长度,例如:‘set‘的长度为323 ntokens++;24 *e = ‘\0‘;25 if (ntokens == max_tokens - 1) {26 e++;27 s = e; /* so we don‘t add an extra token */28 break;29 }30 }31 s = e + 1;32 }33 e++;34 }35 36 if (s != e) {37 tokens[ntokens].value =http://www.mamicode.com/ s;38 tokens[ntokens].length = e - s;39 ntokens++;40 }41 42 /*43 * If we scanned the whole string, the terminal value pointer is null,44 * otherwise it is the first unprocessed character.45 */46 tokens[ntokens].value = http://www.mamicode.com/*e == ‘\0‘ ? NULL : e;47 tokens[ntokens].length = 0;48 ntokens++;49 50 return ntokens;51 }
解析完文本命令后,回到process_command函数中,我们可以看到很熟悉的命令,是的,接下来,在一个if-else的多分支判断中,memcached根据tokens[COMMAND_TOKEN].value决定调用那一个函数处理相应的命令:
1 static void process_command(conn *c, char *command) { 2 3 // .... 4 5 ntokens = tokenize_command(command, tokens, MAX_TOKENS); 6 if (ntokens >= 3 && 7 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || 8 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { 9 10 // 这里就是执行get命令的分支11 process_get_command(c, tokens, ntokens, false);12 13 } else if ((ntokens == 6 || ntokens == 7) &&14 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||15 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||16 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||17 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||18 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {19 20 // 这里就是执行set、add、replace等命令的分支21 process_update_command(c, tokens, ntokens, comm, false);22 23 } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {24 25 // 这里也执行process_update_command函数,也是对相应的key执行写操作,与上面一个分支不同的是最后一个参数是true,意思是写的过程使用CAS协议,这里不侧重讲,26 // CAS目的是保证在并发写的时候保证一致性27 process_update_command(c, tokens, ntokens, comm, true);28 29 } else if .............
memcached存储命令分析
memcached把内存分割成各种尺寸的块(chunk),并把尺寸相同的块分成组(chunk的集合),每个chunk集合被称为slab。Memcached的内存分配以Page为单位,Page默认值为1M,可以在启动时通过-I参数来指定。Slab是由多个Page组成的,Page按照指定大小切割成多个chunk。
每一对[key,value]的数据被封装到item的结构体里,每种类型的slab用一个item链表来维护它的所有item。例如,一个item项的数据大小加上item的头部信息(为了方便描述,下面把这两项的和统称[key,value]大小吧)是90KB,slab[i]的chunk块大小是136KB,slab[i-1]的chunk块大小是88KB,那么item会被分配slab[i]的一个chunk块(并保存到slab[i]维护的一个item链表),这样做的目的是为了尽量减少内存碎片。更多关于Slab Allocation的原理可以查找其他的资料。这里不详解
memcached的存储命令:add、set、replace、append、prepend等,上面简单地说了memcached slab机制,知道memcached是根据相应的[key,value]大小找到相应的slab,那么,我们再次调用set命令某个已存在的key的value的时候,memcached是怎么工作的呢?
起初,我的直觉思维是,找到key相应的item,修改item的value就好了。那么,问题来了,假如先前[key,value]大小是90KB,被分配到slab[i]的,现在我们修改了key对应的value,[key,value]大小也改变了,变成了80KB,应该分配到slab[i-1]的,如果只是修改原来item的数据,那么就不符合Slab Allocation的原理,会造成很大的内存碎片浪费。
memcached对存储命令:add、set、replace、append、prepend处理方法大体都相似的,从上面的源码可以看出,都是通过执行process_update_command函数来处理。
memcached的处理存储命令思路是这样的:例如,客户端的一个set命令,memcached都会重新根据[key,value]大小找到合适slab,并把相应的数据封装到新的item里面【源码的注1】(不会直接修改旧的item项),如果对应的slab没有内存空间不足,就调用LRU算法把该slab的一个最近最少使用项的空间分配给新的item【源码的注2,出现在do_item_alloc函数】(如果关闭LRU移除项的功能,那么就会报“SERVER_ERROR out of memory storing object”错误,是set命令的话,还会把key对应的旧的item项移除【源码的注3】,即我们这时候不能通过get key来获取到旧的数据了),分配空间成功,那就是对add、set、replace、append、prepend这几个存储命令做差异化处理。
1 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) { 2 3 // .... 4 5 set_noreply_maybe(c, tokens, ntokens); // 设置命令可选字段的[noreply] 6 7 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { 8 out_string(c, "CLIENT_ERROR bad command line format"); 9 return;10 }11 12 key = tokens[KEY_TOKEN].value;13 nkey = tokens[KEY_TOKEN].length;14 15 // 把命令相应字段的字符串安全转换成整数16 if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)17 && safe_strtol(tokens[3].value, &exptime_int)18 && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {19 out_string(c, "CLIENT_ERROR bad command line format");20 return;21 }22 23 exptime = exptime_int;24 25 // #define REALTIME_MAXDELTA 60*60*24*3026 if (exptime < 0)27 exptime = REALTIME_MAXDELTA + 1;28 29 // CAS协议,防止并发写不一致30 if (handle_cas) {31 if (!safe_strtoull(tokens[5].value, &req_cas_id)) {32 out_string(c, "CLIENT_ERROR bad command line format");33 return;34 }35 }36 37 // ........38 39 // 注1:无论是add、set或者是replace命令,都会从新分配一个新的item40 it = item_alloc(key, nkey, flags, realtime(exptime), vlen);41 42 // 如果新的item分配失败43 if (it == 0) {44 if (! item_size_ok(nkey, flags, vlen))45 out_string(c, "SERVER_ERROR object too large for cache"); // 一种错误情况:数据太大,没有合适slab,不能缓存数据46 else47 out_string(c, "SERVER_ERROR out of memory storing object"); // 另一种是:没有了内存空间缓存数据,通常这种事在关闭LRU功能的情况下出现48 /* swallow the data line */49 c->write_and_go = conn_swallow;50 c->sbytes = vlen;51 52 // 注3:新的item分配失败,如果是set命令,并且key对应着存在旧的item,那么就把旧的item删除53 if (comm == NREAD_SET) {54 it = item_get(key, nkey);55 if (it) {56 item_unlink(it);57 item_remove(it);58 }59 }60 61 return;62 }63 ITEM_set_cas(it, req_cas_id);64 65 c->item = it;66 c->ritem = ITEM_data(it);67 c->rlbytes = it->nbytes;68 c->cmd = comm;69 70 // 会在这一步进行add、set、replace等存储命令的差异化处理71 conn_set_state(c, conn_nread);72 }
do_item_alloc函数:
1 item *do_item_alloc(char *key, const size_t nkey, const int flags, 2 const rel_time_t exptime, const int nbytes, 3 const uint32_t cur_hv) { 4 uint8_t nsuffix; 5 item *it = NULL; 6 char suffix[40]; 7 size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix); //接收到的item数据长度+item头部长度 8 if (settings.use_cas) { 9 ntotal += sizeof(uint64_t); 10 } 11 12 unsigned int id = slabs_clsid(ntotal); 13 if (id == 0) 14 return 0; 15 16 mutex_lock(&cache_lock); 17 /* do a quick check if we have any expired items in the tail.. */ 18 int tries = 5; 19 int tried_alloc = 0; 20 item *search; 21 void *hold_lock = NULL; 22 rel_time_t oldest_live = settings.oldest_live; 23 24 search = tails[id]; 25 26 // tries = 5 ,循环查找过期的item,最多循环5次 27 for (; tries > 0 && search != NULL; tries--, search=search->prev) { 28 uint32_t hv = hash(ITEM_key(search), search->nkey, 0); 29 30 // 如果当前item被上锁,那么就跳过 31 if (hv != cur_hv && (hold_lock = item_trylock(hv)) == NULL) 32 continue; 33 /* Now see if the item is refcount locked */ 34 if (refcount_incr(&search->refcount) != 2) { 35 refcount_decr(&search->refcount); 36 /* Old rare bug could cause a refcount leak. We haven‘t seen 37 * it in years, but we leave this code in to prevent failures 38 * just in case */ 39 if (search->time + TAIL_REPAIR_TIME < current_time) { 40 itemstats[id].tailrepairs++; 41 search->refcount = 1; 42 do_item_unlink_nolock(search, hv); 43 } 44 if (hold_lock) 45 item_trylock_unlock(hold_lock); 46 continue; 47 } 48 49 // item过期,如果没有设置过期时间,那么就使用系统设置的默认过期时间 50 if ((search->exptime != 0 && search->exptime < current_time) 51 || (search->time <= oldest_live && oldest_live <= current_time)) { 52 itemstats[id].reclaimed++; 53 if ((search->it_flags & ITEM_FETCHED) == 0) { 54 itemstats[id].expired_unfetched++; 55 } 56 it = search; 57 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); // 当前搜索的item过期,重新计算slab已经分配的字节 58 do_item_unlink_nolock(it, hv); // 把当前搜索的item从链表中移除 59 /* Initialize the item block: */ 60 it->slabs_clsid = 0; 61 } else if ((it = slabs_alloc(ntotal, id)) == NULL) { // 没有找到过期的item,新分配一个item,分配失败就执行else if里面的代码 62 tried_alloc = 1; 63 if (settings.evict_to_free == 0) { // 注2:内存耗尽,如果evict_to_free = 1(默认)LRU算法启动,移除最近最少使用的item 64 itemstats[id].outofmemory++; 65 } else { 66 itemstats[id].evicted++; 67 itemstats[id].evicted_time = current_time - search->time; 68 if (search->exptime != 0) 69 itemstats[id].evicted_nonzero++; 70 if ((search->it_flags & ITEM_FETCHED) == 0) { 71 itemstats[id].evicted_unfetched++; 72 } 73 it = search; 74 slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); // 当前搜索的item过期,重新计算slab已经分配的字节 75 do_item_unlink_nolock(it, hv); // 把当前需要移除的item从链表中移除 76 /* Initialize the item block: */ 77 it->slabs_clsid = 0; 78 79 if (settings.slab_automove == 2) 80 slabs_reassign(-1, id); 81 } 82 } 83 84 refcount_decr(&search->refcount); 85 /* If hash values were equal, we don‘t grab a second lock */ 86 if (hold_lock) 87 item_trylock_unlock(hold_lock); 88 break; 89 } 90 91 if (!tried_alloc && (tries == 0 || search == NULL)) 92 it = slabs_alloc(ntotal, id); 93 94 if (it == NULL) { 95 itemstats[id].outofmemory++; 96 mutex_unlock(&cache_lock); 97 return NULL; 98 } 99 100 assert(it->slabs_clsid == 0);101 assert(it != heads[id]);102 103 104 it->refcount = 1; 105 mutex_unlock(&cache_lock);106 it->next = it->prev = it->h_next = 0;107 it->slabs_clsid = id;108 109 DEBUG_REFCNT(it, ‘*‘);110 it->it_flags = settings.use_cas ? ITEM_CAS : 0;111 it->nkey = nkey;112 it->nbytes = nbytes;113 memcpy(ITEM_key(it), key, nkey);114 it->exptime = exptime;115 memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);116 it->nsuffix = nsuffix;117 return it;118 }
以上memcached只是为[key,value]找到了新的slab,分配了新的item,并把命令相关的头部信息保存到,但是,还有一个重要的步奏没有说的,那就是[key,value]中的value怎么和item关联起来的,add和set的区别又是怎样区分的,由于还有很长的一段代码,所以我还是分篇记录,预告一下,下一篇《memcached学习笔记——存储命令源码分析下》会讲遗留的这两个问题。
未完,待续。
更多阅读查看:JC&hcoding
memcached学习笔记——存储命令源码分析上