首页 > 代码库 > memcached哈希表操作主要逻辑笔记

memcached哈希表操作主要逻辑笔记

以下注释的源代码都在memcached项目的assoc.c文件中

  1 /* how many powers of 2‘s worth of buckets we use */  2 unsigned int hashpower = HASHPOWER_DEFAULT; /* 哈希表bucket的级别,(1<<hashpower) == bucket的个数 */  3   4 /* Main hash table. This is where we look except during expansion. */  5 /**主要的哈希表, 用来存储memcached的key-value数据  6 * 扩容时会将数据暂存在另一个指针,之后重新分配空间,再以bucket为单位  7 * 将已有的数据迁移到这张表,所以这张表始终代表最新的数据  8 */  9 static item** primary_hashtable = 0; 10  11 /* 12 * Previous hash table. During expansion, we look here for keys that haven‘t 13 * been moved over to the primary yet. 14 */ 15 static item** old_hashtable = 0; /**原有的哈希表,只有在扩容时才会使用,保存原有的哈希表的数据 */ 16  17 /* Number of items in the hash table. */ 18 static unsigned int hash_items = 0; /** 整个哈希表中item的个数*/ 19  20 /* Flag: Are we in the middle of expanding now? */ 21 static bool expanding = false; /** 标识是否正在扩容哈希表*/ 22 static bool started_expanding = false; /* 是否已经开始扩容*/ 23  24 /* 25 * During expansion we migrate values with bucket granularity; this is how 26 * far we‘ve gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1. 27 */ 28 /** 29 * 在扩容期间,数据的迁移是以bucket为单位进行迁移, expand_bucket表示迁移进行到第几个bucket 30 */ 31 static unsigned int expand_bucket = 0; 32  33 /** 34 * 哈希表的初始化 35 * 整个哈希表类似于一个二维数组,初始化分配bucket的空间,具体的item直接申请空间链接在bucket子链上(拉链法解决key冲突) 36 */ 37 void assoc_init(const int hashtable_init) { 38 if (hashtable_init) { 39 hashpower = hashtable_init; 40 } 41 /**初始化哈希表的存储空间*/ 42 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *)); 43 if (! primary_hashtable) { 44 fprintf(stderr, "Failed to init hashtable.\n"); 45 exit(EXIT_FAILURE); 46 } 47 STATS_LOCK(); 48 stats.hash_power_level = hashpower; 49 stats.hash_bytes = hashsize(hashpower) * sizeof(void *); 50 STATS_UNLOCK(); 51 } 52  53 /** 54 * 根据key查找item 55 * hv 表示key的hash值 56 */ 57 item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) { 58 item *it; //指向所属的bucket地址 59 unsigned int oldbucket; 60 //先判断是否正在扩容 61 if (expanding && //正在扩容时继续判断key所属的bucket是否已经迁移到old_hashtable 62 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) 63 { 64 it = old_hashtable[oldbucket]; //已经迁移到old_hashtable则在这里查找bucket 65 } else { 66 it = primary_hashtable[hv & hashmask(hashpower)];//没有迁移或者尚未迁移所属的bucket 67 } 68  69 item *ret = NULL; 70 int depth = 0; 71 /** 循环比较拉链上的每个item的key值,相等则返回item的引用*/ 72 while (it) { 73 if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) { 74 ret = it; 75 break; 76 } 77 it = it->h_next; 78 ++depth; 79 } 80 MEMCACHED_ASSOC_FIND(key, nkey, depth); 81 return ret; 82 } 83 /* returns the address of the item pointer before the key. if *item == 0, 84 the item wasn‘t found */ 85 /** 86 * 查找item的地址 87 */ 88 static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) { 89 item **pos; 90 unsigned int oldbucket; 91 /** 同样是先确定在哪一张表里找*/ 92 if (expanding && 93 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) 94 { 95 pos = &old_hashtable[oldbucket]; 96 } else { 97 pos = &primary_hashtable[hv & hashmask(hashpower)]; 98 } 99 100 /** */101 while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {102 pos = &(*pos)->h_next;103 }104 return pos;105 }106 107 /* grows the hashtable to the next power of 2. */108 /** 将已有的哈希表扩容为原来的2倍buckets数量*/109 static void assoc_expand(void) {110 /** old_hashtable指向已有的primary_hashtable*/111 old_hashtable = primary_hashtable;112 113 /**重新为 primary_hashtable分配空间*/114 primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));115 if (primary_hashtable) { /** 分配成功*/116 if (settings.verbose > 1)117 fprintf(stderr, "Hash table expansion starting\n");118 hashpower++;119 expanding = true; /** 设置开始扩容标识*/120 expand_bucket = 0; /** 已迁移的bucket序号*/121 STATS_LOCK();122 stats.hash_power_level = hashpower;123 stats.hash_bytes += hashsize(hashpower) * sizeof(void *);124 stats.hash_is_expanding = 1;125 STATS_UNLOCK();126 } else {127 primary_hashtable = old_hashtable; /** 分配失败*/128 /* Bad news, but we can keep running. */129 }130 }131 132 static void assoc_start_expand(void) {133 if (started_expanding)134 return;135 started_expanding = true;136 pthread_cond_signal(&maintenance_cond);137 }138 139 /* Note: this isn‘t an assoc_update. The key must not already exist to call this */140 /** 插入一个item到哈希表,这里必须保证item->key尚未存在已有的哈希表*/141 int assoc_insert(item *it, const uint32_t hv) {142 unsigned int oldbucket;143 144 // assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn‘t have duplicately named things defined */145 146 if (expanding &&147 (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)148 { /** 正在扩容且对应的bucket尚未被迁移到primary_hashtable*/149 it->h_next = old_hashtable[oldbucket]; 150 old_hashtable[oldbucket] = it;151 } else { /** 没有在扩容或者对应的bucket已经被迁移到primary_hashtable*/152 it->h_next = primary_hashtable[hv & hashmask(hashpower)];153 primary_hashtable[hv & hashmask(hashpower)] = it;154 }155 156 // 更新hash_item的数量157 hash_items++;158 159 /** 哈希表item的数量超过bucket数的3分之2, 这里表示只关心由于存储item数量增长必须引起的扩容*/160 if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {161 assoc_start_expand(); //发送条件变量满足的信号162 }163 164 MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);165 return 1;166 }167 /** 删除一个item*/168 void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {169 /** 找到指向item地址的指针*/170 item **before = _hashitem_before(key, nkey, hv);171 172 if (*before) {173 item *nxt;174 hash_items--; /** 减少1个 */175 /* The DTrace probe cannot be triggered as the last instruction176 * due to possible tail-optimization by the compiler177 */178 MEMCACHED_ASSOC_DELETE(key, nkey, hash_items);179 /** 链表操作删除一个元素*/180 nxt = (*before)->h_next;181 (*before)->h_next = 0; /* probably pointless, but whatever. */182 *before = nxt;183 return;184 }185 /* Note: we never actually get here. the callers don‘t delete things186 they can‘t find. */187 assert(*before != 0);188 }189 190 /** 标识是否需要执行维护线程主要逻辑*/191 static volatile int do_run_maintenance_thread = 1;192 193 #define DEFAULT_HASH_BULK_MOVE 1194 int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;195 196 /** 哈希表维护线程的主要逻辑*/197 static void *assoc_maintenance_thread(void *arg) {198 199 /** 主线程未退出时,这里基本是进入一个无限循环*/200 while (do_run_maintenance_thread) {201 int ii = 0;202 203 /* Lock the cache, and bulk move multiple buckets to the new204 * hash table. */205 /** 获取worker线程共享的item_global_lock锁,批量迁移buckets到新的哈希表*/206 item_lock_global();207 mutex_lock(&cache_lock);208 209 /** 这个循环默认只走一次,主要目的是不想过久的占用item全局锁,影响worker线程工作效率*/210 for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {211 item *it, *next;212 int bucket;213 214 /** 对bucket上拉链的每一个item进行重新hash到primary_hashtable*/215 for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {216 next = it->h_next;217 218 /** 重新计算所属的bucket*/219 bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);220 /** 加入到primary_hashtable对应bucket的头部*/221 it->h_next = primary_hashtable[bucket];222 primary_hashtable[bucket] = it;223 }224 225 /** 将old_hashtable上已经被迁移的bucket置为NULL*/226 old_hashtable[expand_bucket] = NULL;227 228 /** 递增迁移的bucket序号*/229 expand_bucket++;230 /** 判断是否已经迁移完*/231 if (expand_bucket == hashsize(hashpower - 1)) {232 expanding = false;233 free(old_hashtable); //释放old_hashtable234 STATS_LOCK();235 stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);236 stats.hash_is_expanding = 0;237 STATS_UNLOCK();238 if (settings.verbose > 1)239 fprintf(stderr, "Hash table expansion done\n");240 }241 }242 243 /** 释放锁*/244 mutex_unlock(&cache_lock);245 /** 释放全局锁,这样worker线程才有机会获得锁进而服务用户请求,减少等待时间*/246 item_unlock_global();247 248 /** 未进行扩容或者扩容结束*/249 if (!expanding) {250 /** 通知其他线程使用细粒度的锁,通过线程pipe进行通信*/251 /* finished expanding. tell all threads to use fine-grained locks */252 switch_item_lock_type(ITEM_LOCK_GRANULAR);253 254 /**恢复slabs的自平衡锁,确保哈希表扩容不会与slabs重新分配同时进行*/255 slabs_rebalancer_resume();256 257 /**本次扩容完成,等待下一次调用*/258 /* We are done expanding.. just wait for next invocation */259 mutex_lock(&cache_lock);260 started_expanding = false;261 /** 刚启动系统时尚未需要进行扩容,线程会阻塞到这里等待线程条件信号*/262 /** 等待条件变量满足信号*/263 pthread_cond_wait(&maintenance_cond, &cache_lock);264 /* Before doing anything, tell threads to use a global lock */265 mutex_unlock(&cache_lock);266 267 /** 确保slabs没有正在进行重新分配*/268 slabs_rebalancer_pause();269 270 /**通过pipe的方式通知worker线程改变使用锁的粒度为全局锁*/271 switch_item_lock_type(ITEM_LOCK_GLOBAL);272 mutex_lock(&cache_lock);273 /** 开始扩容*/274 assoc_expand();275 mutex_unlock(&cache_lock);276 }277 }278 return NULL;279 }280 281 static pthread_t maintenance_tid;282 283 /** 启动哈希表扩容监听线程*/284 int start_assoc_maintenance_thread() {285 int ret;286 char *env = getenv("MEMCACHED_HASH_BULK_MOVE");287 if (env != NULL) {288 hash_bulk_move = atoi(env);289 if (hash_bulk_move == 0) {290 hash_bulk_move = DEFAULT_HASH_BULK_MOVE;291 }292 }293 /** 创建线程*/294 if ((ret = pthread_create(&maintenance_tid, NULL,295 assoc_maintenance_thread, NULL)) != 0) {296 fprintf(stderr, "Can‘t create thread: %s\n", strerror(ret));297 return -1;298 }299 return 0;300 }301 302 /** 停止扩容线程,基本是在主线程退出时才会被调用*/303 void stop_assoc_maintenance_thread() {304 mutex_lock(&cache_lock);305 do_run_maintenance_thread = 0;306 pthread_cond_signal(&maintenance_cond);307 mutex_unlock(&cache_lock);308 309 /* Wait for the maintenance thread to stop */310 pthread_join(maintenance_tid, NULL);311 }