首页 > 代码库 > Memcached源码分析之thread.c

Memcached源码分析之thread.c

  1. /*
  2.  * 文件开头先啰嗦几句:
  3.  *
  4.  * thread.c文件代表的是线程模块。但是你会看到这个模块里面有很多其它方法,
  5.     例如关于item的各种操作函数,item_alloc,item_remove,item_link等等。
  6.     我们有个items模块,这些不都是items模块要做的事情吗?为什么thread模块也有?
  7.     你仔细看会发现,thread里面的这种函数,例如item_remove,items模块里面
  8.     都会有一个对应的do_item_remove函数,而thread中的item_remove仅仅是调用
  9.     items模块中的do_item_remove,唯一多出来的就是thread在do_item_remove前后
  10.     加了加锁和解锁的操作。
  11.     其实这是很好的一种设计。
  12.     1)因为像"删除item"这样的一个逻辑都是由某个线程,而且这里是工作线程执行,
  13.         所以这是一个线程层面的事情。就是说是“某个工作线程去删除item”这样一件事。
  14.     2)更重要的是原子性及一致性问题,某个item数据,很有可能同时多个线程在修改,
  15.         那么需要加锁,那么锁最应该加在哪个地方?既然问题是线程引起的,那么负责
  16.         解决的无疑是线程模块。
  17.     3)所以这里像这种函数,thread此时相当于是items的外壳,起调控作用,在线程层面
  18.         开放给外部调用,同时在内部加锁。而items模块里面定义的do_xxx函数都不需要多
  19.         加考虑,无条件执行对item进行修改,而由外部被调用方来控制。相信很多需要加锁
  20.         的项目都会面临这样的问题:锁应该加在哪一层?可以参考memcached这样的设计。
  21.  *
  22.  */
  23. #include "memcached.h"
  24. #include <assert.h>
  25. #include <stdio.h>
  26. #include <errno.h>
  27. #include <stdlib.h>
  28. #include <errno.h>
  29. #include <string.h>
  30. #include <pthread.h>
  31. #ifdef __sun
  32. #include <atomic.h>
  33. #endif
  34. #define ITEMS_PER_ALLOC 64
  35. /**
  36.     下面这个CQ_ITEM结构体:
  37.     可以这么理解,主线程accept连接后,把client fd
  38.     分发到worker线程的同时会顺带一些与此client连接相关的信息,
  39.     而CQ_ITEM是包装了这些信息的一个对象,有点"参数对象"的概念。
  40.     记住这货是主线程那边丢过来的。
  41.     CQ_ITEM中的CQ虽然是connection queue的缩写,
  42.     它与memcached.h中定义的conn结构体是完全不一样的概念,
  43.     但worker线程会利用这个CQ_ITEM对象去初始化conn对象
  44.  */
  45. typedef struct conn_queue_item CQ_ITEM;
  46. struct conn_queue_item {
  47.     int sfd;
  48.     enum conn_states init_state;
  49.     int event_flags;
  50.     int read_buffer_size;
  51.     enum network_transport transport;
  52.     CQ_ITEM *next;
  53. };
  54. /*
  55. 上面的CQ_ITEM的队列对象,每个worker线程对象都保存着这样一个队列,处理
  56. 主线程那边分发过来的连接请求时用到。
  57. */
  58. typedef struct conn_queue CQ;
  59. struct conn_queue {
  60.     CQ_ITEM *head;
  61.     CQ_ITEM *tail;
  62.     pthread_mutex_t lock;
  63. };
  64. //下面是各种锁
  65. /**
  66. 个人认为这个锁用于锁住全局数量不变的对象,例如slabclass,LRU链表等等
  67. 区别于item锁,由于item对象是动态增长的,数量非常多,
  68. item锁是用hash的方式分配一张大大的item锁表来控制锁的粒度
  69. */
  70. pthread_mutex_t cache_lock;
  71. pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER; //连接锁
  72. #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
  73. pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
  74. #endif
  75.  
  76. static pthread_mutex_t stats_lock; //统计锁
  77.  
  78. static CQ_ITEM *cqi_freelist;
  79. static pthread_mutex_t cqi_freelist_lock;
  80. static pthread_mutex_t *item_locks; //item锁
  81.  
  82. static uint32_t item_lock_count; //item锁总数
  83. static unsigned int item_lock_hashpower; //item锁的hash表 指数,锁总数为2的item_lock_hashpower个,见下面的hashsize
  84. #define hashsize(n) ((unsigned long int)1<<(n))
  85. #define hashmask(n) (hashsize(n)-1)
  86.  
  87. static pthread_mutex_t item_global_lock;
  88.  
  89. static pthread_key_t item_lock_type_key;
  90. static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
  91. static LIBEVENT_THREAD *threads;
  92. static int init_count = 0; //有多少个worker线程已经被初始化
  93. static pthread_mutex_t init_lock; //初始化锁
  94. static pthread_cond_t init_cond; //初始化条件变量
  95. static void thread_libevent_process(int fd, short which, void *arg);
  96. //引用计数加1
  97. unsigned short refcount_incr(unsigned short *refcount) {
  98. #ifdef HAVE_GCC_ATOMICS
  99.     return __sync_add_and_fetch(refcount, 1);
  100. #elif defined(__sun)
  101.     return atomic_inc_ushort_nv(refcount);
  102. #else
  103.     unsigned short res;
  104.     mutex_lock(&atomics_mutex);
  105.     (*refcount)++;
  106.     res = *refcount;
  107.     mutex_unlock(&atomics_mutex);
  108.     return res;
  109. #endif
  110. }
  111. //引用计数减1
  112. unsigned short refcount_decr(unsigned short *refcount) {
  113. #ifdef HAVE_GCC_ATOMICS
  114.     return __sync_sub_and_fetch(refcount, 1);
  115. #elif defined(__sun)
  116.     return atomic_dec_ushort_nv(refcount);
  117. #else
  118.     unsigned short res;
  119.     mutex_lock(&atomics_mutex);
  120.     (*refcount)--;
  121.     res = *refcount;
  122.     mutex_unlock(&atomics_mutex);
  123.     return res;
  124. #endif
  125. }
  126.  
  127. void item_lock_global(void) {
  128.     mutex_lock(&item_global_lock);
  129. }
  130. void item_unlock_global(void) {
  131.     mutex_unlock(&item_global_lock);
  132. }
  133. void item_lock(uint32_t hv) {
  134.     uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
  135.     if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
  136.         mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
  137.     } else {
  138.         mutex_lock(&item_global_lock);
  139.     }
  140. }
  141.  
  142. void *item_trylock(uint32_t hv) {
  143.     pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
  144.     if (pthread_mutex_trylock(lock) == 0) {
  145.         return lock;
  146.     }
  147.     return NULL;
  148. }
  149. void item_trylock_unlock(void *lock) {
  150.     mutex_unlock((pthread_mutex_t *) lock);
  151. }
  152. void item_unlock(uint32_t hv) {
  153.     uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
  154.     if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
  155.         mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
  156.     } else {
  157.         mutex_unlock(&item_global_lock);
  158.     }
  159. }
  160. static void wait_for_thread_registration(int nthreads) {
  161.     while (init_count < nthreads) {
  162.         pthread_cond_wait(&init_cond, &init_lock); //主线程利用条件变量等待所有worker线程启动完毕
  163.     }
  164. }
  165. //worker线程注册函数,主要是统计worker线程完成初始化个数。
  166. static void register_thread_initialized(void) {
  167.     pthread_mutex_lock(&init_lock);
  168.     init_count++;
  169.     pthread_cond_signal(&init_cond);
  170.     pthread_mutex_unlock(&init_lock);
  171. }
  172. //item锁的粒度有几种,这里是切换类型
  173. void switch_item_lock_type(enum item_lock_types type) {
  174.     char buf[1];
  175.     int i;
  176.     switch (type) {
  177.         case ITEM_LOCK_GRANULAR:
  178.             buf[0] = ‘l‘;
  179.             break;
  180.         case ITEM_LOCK_GLOBAL:
  181.             buf[0] = ‘g‘;
  182.             break;
  183.         default:
  184.             fprintf(stderr, "Unknown lock type: %d\n", type);
  185.             assert(1 == 0);
  186.             break;
  187.     }
  188.     pthread_mutex_lock(&init_lock);
  189.     init_count = 0;
  190.     for (i = 0; i < settings.num_threads; i++) {
  191.         if (write(threads[i].notify_send_fd, buf, 1) != 1) {
  192.             perror("Failed writing to notify pipe");
  193.             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
  194.         }
  195.     }
  196.     wait_for_thread_registration(settings.num_threads);
  197.     pthread_mutex_unlock(&init_lock);
  198. }
  199. /*
  200.  * Initializes a connection queue.
  201.     初始化一个CQ对象,CQ结构体和CQ_ITEM结构体的作用见它们定义处。
  202.  */
  203. static void cq_init(CQ *cq) {
  204.     pthread_mutex_init(&cq->lock, NULL);
  205.     cq->head = NULL;
  206.     cq->tail = NULL;
  207. }
  208.  /**
  209.  从worker线程的CQ队列里面pop出一个CQ_ITEM对象
  210.  */
  211. static CQ_ITEM *cq_pop(CQ *cq) {
  212.     CQ_ITEM *item;
  213.     pthread_mutex_lock(&cq->lock);
  214.     item = cq->head;
  215.     if (NULL != item) {
  216.         cq->head = item->next;
  217.         if (NULL == cq->head)
  218.             cq->tail = NULL;
  219.     }
  220.     pthread_mutex_unlock(&cq->lock);
  221.     return item;
  222. }
  223.   /**
  224.  push一个CQ_ITEM对象到worker线程的CQ队列中
  225.  */
  226. static void cq_push(CQ *cq, CQ_ITEM *item) {
  227.     item->next = NULL;
  228.     pthread_mutex_lock(&cq->lock);
  229.     if (NULL == cq->tail)
  230.         cq->head = item;
  231.     else
  232.         cq->tail->next = item;
  233.     cq->tail = item;
  234.     pthread_mutex_unlock(&cq->lock);
  235. }
  236. /*
  237.  * Returns a fresh connection queue item.
  238.     分配一个CQ_ITEM对象
  239.  */
  240. static CQ_ITEM *cqi_new(void) {
  241.     CQ_ITEM *item = NULL;
  242.     pthread_mutex_lock(&cqi_freelist_lock);
  243.     if (cqi_freelist) {
  244.         item = cqi_freelist;
  245.         cqi_freelist = item->next;
  246.     }
  247.     pthread_mutex_unlock(&cqi_freelist_lock);
  248.     if (NULL == item) {
  249.         int i;
  250.         /* Allocate a bunch of items at once to reduce fragmentation */
  251.         item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
  252.         if (NULL == item) {
  253.             STATS_LOCK();
  254.             stats.malloc_fails++;
  255.             STATS_UNLOCK();
  256.             return NULL;
  257.         }
  258.         for (i = 2; i < ITEMS_PER_ALLOC; i++)
  259.             item[i - 1].next = &item[i];
  260.         pthread_mutex_lock(&cqi_freelist_lock);
  261.         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
  262.         cqi_freelist = &item[1];
  263.         pthread_mutex_unlock(&cqi_freelist_lock);
  264.     }
  265.     return item;
  266. }
  267. /*
  268.  * Frees a connection queue item (adds it to the freelist.)
  269.  */
  270. static void cqi_free(CQ_ITEM *item) {
  271.     pthread_mutex_lock(&cqi_freelist_lock);
  272.     item->next = cqi_freelist;
  273.     cqi_freelist = item;
  274.     pthread_mutex_unlock(&cqi_freelist_lock);
  275. }
  276.  
  277. /*
  278.     创建并启动worker线程,在thread_init主线程初始化时调用
  279.  */
  280. static void create_worker(void *(*func)(void *), void *arg) {
  281.     pthread_t thread;
  282.     pthread_attr_t attr;
  283.     int ret;
  284.     pthread_attr_init(&attr);
  285.     if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
  286.         fprintf(stderr, "Can‘t create thread: %s\n",
  287.                 strerror(ret));
  288.         exit(1);
  289.     }
  290. }
  291.  
  292. void accept_new_conns(const bool do_accept) {
  293.     pthread_mutex_lock(&conn_lock);
  294.     do_accept_new_conns(do_accept);
  295.     pthread_mutex_unlock(&conn_lock);
  296. }
  297. /****************************** LIBEVENT THREADS *****************************/
  298. /*
  299.  * 装备worker线程,worker线程的event_base在此设置
  300.  */
  301. static void setup_thread(LIBEVENT_THREAD *me) {
  302.     me->base = event_init(); //为每个worker线程分配自己的event_base
  303.     if (! me->base) {
  304.         fprintf(stderr, "Can‘t allocate event base\n");
  305.         exit(1);
  306.     }
  307.     /* Listen for notifications from other threads */
  308.     event_set(&me->notify_event, me->notify_receive_fd,
  309.               EV_READ | EV_PERSIST, thread_libevent_process, me); //监听管道接收fd,这里即监听
  310.     //来自主线程的消息,事件处理函数为thread_libevent_process
  311.     event_base_set(me->base, &me->notify_event);
  312.     if (event_add(&me->notify_event, 0) == -1) {
  313.         fprintf(stderr, "Can‘t monitor libevent notify pipe\n");
  314.         exit(1);
  315.     }
  316.     me->new_conn_queue = malloc(sizeof(struct conn_queue)); //CQ_ITEM队列
  317.     if (me->new_conn_queue == NULL) {
  318.         perror("Failed to allocate memory for connection queue");
  319.         exit(EXIT_FAILURE);
  320.     }
  321.     cq_init(me->new_conn_queue); //初始化CQ_ITEM对象队列
  322.     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
  323.         perror("Failed to initialize mutex");
  324.         exit(EXIT_FAILURE);
  325.     }
  326.     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
  327.                                     NULL, NULL);
  328.     if (me->suffix_cache == NULL) {
  329.         fprintf(stderr, "Failed to create suffix cache\n");
  330.         exit(EXIT_FAILURE);
  331.     }
  332. }
  333.  
  334. /*
  335.  * 这里主要是让worker线程进入event_base_loop
  336.  */
  337. static void *worker_libevent(void *arg) {
  338.     LIBEVENT_THREAD *me = arg;
  339.     /* Any per-thread setup can happen here; thread_init() will block until
  340.      * all threads have finished initializing.
  341.      */
  342.     /* set an indexable thread-specific memory item for the lock type.
  343.      * this could be unnecessary if we pass the conn *c struct through
  344.      * all item_lock calls...
  345.      */
  346.     me->item_lock_type = ITEM_LOCK_GRANULAR;
  347.     pthread_setspecific(item_lock_type_key, &me->item_lock_type);
  348.     //每一个worker线程进入loop,全局init_count++操作,
  349.     //见thread_init函数后面几行代码和wait_for_thread_registration函数,
  350.     //主线程通过init_count来确认所有线程都启动完毕。
  351.     register_thread_initialized();
  352.     event_base_loop(me->base, 0);
  353.     return NULL;
  354. }
  355.  
  356.  //主线程分发client fd给worker线程后,同时往管道写入buf,唤醒worker线程调用此函数
  357. static void thread_libevent_process(int fd, short which, void *arg) {
  358.     LIBEVENT_THREAD *me = arg;
  359.     CQ_ITEM *item;
  360.     char buf[1];
  361.     if (read(fd, buf, 1) != 1)
  362.         if (settings.verbose > 0)
  363.             fprintf(stderr, "Can‘t read from libevent pipe\n");
  364.     switch (buf[0]) {
  365.     case ‘c‘:
  366.     item = cq_pop(me->new_conn_queue); //取出主线程丢过来的CQ_ITEM
  367.     if (NULL != item) {
  368.         /*
  369.         worker线程创建 conn连接对象,注意由主线程丢过来的CQ_ITEM的init_state为conn_new_cmd (TCP情况下)
  370.         */
  371.         conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
  372.                            item->read_buffer_size, item->transport, me->base);
  373.         if (c == NULL) {
  374.             if (IS_UDP(item->transport)) {
  375.                 fprintf(stderr, "Can‘t listen for events on UDP socket\n");
  376.                 exit(1);
  377.             } else {
  378.                 if (settings.verbose > 0) {
  379.                     fprintf(stderr, "Can‘t listen for events on fd %d\n",
  380.                         item->sfd);
  381.                 }
  382.                 close(item->sfd);
  383.             }
  384.         } else {
  385.             c->thread = me; //设置监听连接的线程为当前worker线程
  386.         }
  387.         cqi_free(item);
  388.     }
  389.         break;
  390.     /* we were told to flip the lock type and report in */
  391.     case ‘l‘:
  392.     me->item_lock_type = ITEM_LOCK_GRANULAR;
  393.     register_thread_initialized();
  394.         break;
  395.     case ‘g‘:
  396.     me->item_lock_type = ITEM_LOCK_GLOBAL;
  397.     register_thread_initialized();
  398.         break;
  399.     }
  400. }
  401. void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
  402.                        int read_buffer_size, enum network_transport transport) {
  403.     /**
  404.     这下面有一个CQ_ITEM结构体,可以这么理解,主线程accept连接后,把client fd
  405.     分发到worker线程的同时会顺带一些与此client连接相关的信息,例如dispatch_conn_new的形参上面列的,
  406.     而CQ_ITEM是包装了这些信息的一个对象。
  407.     CQ_ITEM中的CQ是connection queue的缩写,但它与conn结构体是完全不一样的概念,CQ_ITEM仅仅是把client连接相关的信息
  408.     打包成一个对象而已。
  409.     */
  410.     CQ_ITEM *item = cqi_new();
  411.     char buf[1];
  412.     if (item == NULL) {
  413.         close(sfd);
  414.         /* given that malloc failed this may also fail, but let‘s try */
  415.         fprintf(stderr, "Failed to allocate memory for connection object\n");
  416.         return ;
  417.     }
  418.     int tid = (last_thread + 1) % settings.num_threads;
  419.     LIBEVENT_THREAD *thread = threads + tid; //通过简单的轮叫方式选择处理当前client fd的worker线程
  420.     last_thread = tid;
  421.     //初始化CQ_ITEM对象,即把信息包装
  422.     item->sfd = sfd;
  423.     item->init_state = init_state;
  424.     item->event_flags = event_flags;
  425.     item->read_buffer_size = read_buffer_size;
  426.     item->transport = transport;
  427.     cq_push(thread->new_conn_queue, item); //每个worker线程保存着所有被分发给自己的CQ_ITEM,即new_conn_queue
  428.     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
  429.     /*
  430.     主线程向处理当前client fd的worker线程管道中简单写进一个‘c‘字符,
  431.     由于每个worker线程都监听了管道的receive_fd,于是相应的worker进程收到事件通知,
  432.     触发注册的handler,即thread_libevent_process
  433.     */
  434.     buf[0] = ‘c‘;
  435.     if (write(thread->notify_send_fd, buf, 1) != 1) {
  436.         perror("Writing to thread notify pipe");
  437.     }
  438. }
  439.  
  440. int is_listen_thread() {
  441.     return pthread_self() == dispatcher_thread.thread_id;
  442. }
  443.  
  444. /********************************* ITEM ACCESS *******************************/
  445. /**
  446. 下面是一堆关于item操作的函数,具体逻辑代码都放在items::do_xxx相应的地方
  447. 就像本文件开头说的,这里主要是加了锁而已
  448. */
  449. /*
  450.  * Allocates a new item.
  451.     分配item空间
  452.  */
  453. item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
  454.     item *it;
  455.     /* do_item_alloc handles its own locks */
  456.     /**
  457.     这里比较特殊,与其它item_xxx函数不一样,这里把锁放在do_item_alloc里面做了。
  458.     个人猜测是因为do_item_alloc这个逻辑实在有点复杂,甚至加解锁有可能在某个if条件下要发
  459.     生,加解锁和逻辑本身代码耦合,所以外部不好加锁。因此把锁交给do_item_alloc内部进行考虑。
  460.     */
  461.     it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
  462.     return it;
  463. }
  464. /*
  465.  * Returns an item if it hasn‘t been marked as expired,
  466.  * lazy-expiring as needed.
  467.     取得item,上面这里有句英文注释,说返回不超时的item,因为memcached并没有做实时或者定时把
  468.     超时item清掉的逻辑,而是用了延迟超时。就是当要用这个item的时候,再来针对这个item做超时处理
  469.  */
  470. item *item_get(const char *key, const size_t nkey) {
  471.     item *it;
  472.     uint32_t hv;
  473.     hv = hash(key, nkey);
  474.     item_lock(hv);
  475.     it = do_item_get(key, nkey, hv);
  476.     item_unlock(hv);
  477.     return it;
  478. }
  479. item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
  480.     item *it;
  481.     uint32_t hv;
  482.     hv = hash(key, nkey);
  483.     item_lock(hv);
  484.     it = do_item_touch(key, nkey, exptime, hv);
  485.     item_unlock(hv);
  486.     return it;
  487. }
  488. /*
  489.  * Links an item into the LRU and hashtable.
  490.  */
  491. int item_link(item *item) {
  492.     int ret;
  493.     uint32_t hv;
  494.     hv = hash(ITEM_key(item), item->nkey);
  495.     item_lock(hv);
  496.     ret = do_item_link(item, hv);
  497.     item_unlock(hv);
  498.     return ret;
  499. }
  500.  
  501. void item_remove(item *item) {
  502.     uint32_t hv;
  503.     hv = hash(ITEM_key(item), item->nkey);
  504.     item_lock(hv);
  505.     do_item_remove(item);
  506.     item_unlock(hv);
  507. }
  508. int item_replace(item *old_it, item *new_it, const uint32_t hv) {
  509.     return do_item_replace(old_it, new_it, hv);
  510. }
  511.  
  512. /*
  513.  * Unlinks an item from the LRU and hashtable.
  514.  * 见items::item_unlink
  515.  */
  516. void item_unlink(item *item) {
  517.     uint32_t hv;
  518.     hv = hash(ITEM_key(item), item->nkey);
  519.     item_lock(hv);
  520.     do_item_unlink(item, hv);
  521.     item_unlock(hv);
  522. }
  523.  
  524.  /**
  525. 主要作用是重置在最近使用链表中的位置,更新最近使用时间,见items::do_item_update
  526. */
  527. void item_update(item *item) {
  528.     uint32_t hv;
  529.     hv = hash(ITEM_key(item), item->nkey);
  530.     item_lock(hv);
  531.     do_item_update(item);
  532.     item_unlock(hv);
  533. }
  534. enum delta_result_type add_delta(conn *c, const char *key,
  535.                                  const size_t nkey, int incr,
  536.                                  const int64_t delta, char *buf,
  537.                                  uint64_t *cas) {
  538.     enum delta_result_type ret;
  539.     uint32_t hv;
  540.     hv = hash(key, nkey);
  541.     item_lock(hv);
  542.     ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
  543.     item_unlock(hv);
  544.     return ret;
  545. }
  546. /*
  547.  * Stores an item in the cache (high level, obeys set/add/replace semantics)
  548.  * 保存item信息,主要是调用items::do_store_item,但由于是多线程,所以需求加锁
  549.  * store_item是线程上的操作,所以写在thread模块,在此对外开放,而内部加锁。
  550.  * 除了store_item函数,其它关于item的操作均如此。
  551.  */
  552. enum store_item_type store_item(item *item, int comm, conn* c) {
  553.     enum store_item_type ret;
  554.     uint32_t hv;
  555.     hv = hash(ITEM_key(item), item->nkey); //锁住item
  556.     item_lock(hv);
  557.     ret = do_store_item(item, comm, c, hv);
  558.     item_unlock(hv);
  559.     return ret;
  560. }
  561. void item_flush_expired() {
  562.     mutex_lock(&cache_lock);
  563.     do_item_flush_expired();
  564.     mutex_unlock(&cache_lock);
  565. }
  566. char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
  567.     char *ret;
  568.     mutex_lock(&cache_lock);
  569.     ret = do_item_cachedump(slabs_clsid, limit, bytes);
  570.     mutex_unlock(&cache_lock);
  571.     return ret;
  572. }
  573. void item_stats(ADD_STAT add_stats, void *c) {
  574.     mutex_lock(&cache_lock);
  575.     do_item_stats(add_stats, c);
  576.     mutex_unlock(&cache_lock);
  577. }
  578. void item_stats_totals(ADD_STAT add_stats, void *c) {
  579.     mutex_lock(&cache_lock);
  580.     do_item_stats_totals(add_stats, c);
  581.     mutex_unlock(&cache_lock);
  582. }
  583. void item_stats_sizes(ADD_STAT add_stats, void *c) {
  584.     mutex_lock(&cache_lock);
  585.     do_item_stats_sizes(add_stats, c);
  586.     mutex_unlock(&cache_lock);
  587. }
  588. /******************************* GLOBAL STATS ******************************/
  589. void STATS_LOCK() {
  590.     pthread_mutex_lock(&stats_lock);
  591. }
  592. void STATS_UNLOCK() {
  593.     pthread_mutex_unlock(&stats_lock);
  594. }
  595. void threadlocal_stats_reset(void) {
  596.     int ii, sid;
  597.     for (ii = 0; ii < settings.num_threads; ++ii) {
  598.         pthread_mutex_lock(&threads[ii].stats.mutex);
  599.         threads[ii].stats.get_cmds = 0;
  600.         threads[ii].stats.get_misses = 0;
  601.         threads[ii].stats.touch_cmds = 0;
  602.         threads[ii].stats.touch_misses = 0;
  603.         threads[ii].stats.delete_misses = 0;
  604.         threads[ii].stats.incr_misses = 0;
  605.         threads[ii].stats.decr_misses = 0;
  606.         threads[ii].stats.cas_misses = 0;
  607.         threads[ii].stats.bytes_read = 0;
  608.         threads[ii].stats.bytes_written = 0;
  609.         threads[ii].stats.flush_cmds = 0;
  610.         threads[ii].stats.conn_yields = 0;
  611.         threads[ii].stats.auth_cmds = 0;
  612.         threads[ii].stats.auth_errors = 0;
  613.         for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
  614.             threads[ii].stats.slab_stats[sid].set_cmds = 0;
  615.             threads[ii].stats.slab_stats[sid].get_hits = 0;
  616.             threads[ii].stats.slab_stats[sid].touch_hits = 0;
  617.             threads[ii].stats.slab_stats[sid].delete_hits = 0;
  618.             threads[ii].stats.slab_stats[sid].incr_hits = 0;
  619.             threads[ii].stats.slab_stats[sid].decr_hits = 0;
  620.             threads[ii].stats.slab_stats[sid].cas_hits = 0;
  621.             threads[ii].stats.slab_stats[sid].cas_badval = 0;
  622.         }
  623.         pthread_mutex_unlock(&threads[ii].stats.mutex);
  624.     }
  625. }
  626. void threadlocal_stats_aggregate(struct thread_stats *stats) {
  627.     int ii, sid;
  628.     /* The struct has a mutex, but we can safely set the whole thing
  629.      * to zero since it is unused when aggregating. */
  630.     memset(stats, 0, sizeof(*stats));
  631.     for (ii = 0; ii < settings.num_threads; ++ii) {
  632.         pthread_mutex_lock(&threads[ii].stats.mutex);
  633.         stats->get_cmds += threads[ii].stats.get_cmds;
  634.         stats->get_misses += threads[ii].stats.get_misses;
  635.         stats->touch_cmds += threads[ii].stats.touch_cmds;
  636.         stats->touch_misses += threads[ii].stats.touch_misses;
  637.         stats->delete_misses += threads[ii].stats.delete_misses;
  638.         stats->decr_misses += threads[ii].stats.decr_misses;
  639.         stats->incr_misses += threads[ii].stats.incr_misses;
  640.         stats->cas_misses += threads[ii].stats.cas_misses;
  641.         stats->bytes_read += threads[ii].stats.bytes_read;
  642.         stats->bytes_written += threads[ii].stats.bytes_written;
  643.         stats->flush_cmds += threads[ii].stats.flush_cmds;
  644.         stats->conn_yields += threads[ii].stats.conn_yields;
  645.         stats->auth_cmds += threads[ii].stats.auth_cmds;
  646.         stats->auth_errors += threads[ii].stats.auth_errors;
  647.         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
  648.             stats->slab_stats[sid].set_cmds +=
  649.                 threads[ii].stats.slab_stats[sid].set_cmds;
  650.             stats->slab_stats[sid].get_hits +=
  651.                 threads[ii].stats.slab_stats[sid].get_hits;
  652.             stats->slab_stats[sid].touch_hits +=
  653.                 threads[ii].stats.slab_stats[sid].touch_hits;
  654.             stats->slab_stats[sid].delete_hits +=
  655.                 threads[ii].stats.slab_stats[sid].delete_hits;
  656.             stats->slab_stats[sid].decr_hits +=
  657.                 threads[ii].stats.slab_stats[sid].decr_hits;
  658.             stats->slab_stats[sid].incr_hits +=
  659.                 threads[ii].stats.slab_stats[sid].incr_hits;
  660.             stats->slab_stats[sid].cas_hits +=
  661.                 threads[ii].stats.slab_stats[sid].cas_hits;
  662.             stats->slab_stats[sid].cas_badval +=
  663.                 threads[ii].stats.slab_stats[sid].cas_badval;
  664.         }
  665.         pthread_mutex_unlock(&threads[ii].stats.mutex);
  666.     }
  667. }
  668. void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
  669.     int sid;
  670.     out->set_cmds = 0;
  671.     out->get_hits = 0;
  672.     out->touch_hits = 0;
  673.     out->delete_hits = 0;
  674.     out->incr_hits = 0;
  675.     out->decr_hits = 0;
  676.     out->cas_hits = 0;
  677.     out->cas_badval = 0;
  678.     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
  679.         out->set_cmds += stats->slab_stats[sid].set_cmds;
  680.         out->get_hits += stats->slab_stats[sid].get_hits;
  681.         out->touch_hits += stats->slab_stats[sid].touch_hits;
  682.         out->delete_hits += stats->slab_stats[sid].delete_hits;
  683.         out->decr_hits += stats->slab_stats[sid].decr_hits;
  684.         out->incr_hits += stats->slab_stats[sid].incr_hits;
  685.         out->cas_hits += stats->slab_stats[sid].cas_hits;
  686.         out->cas_badval += stats->slab_stats[sid].cas_badval;
  687.     }
  688. }
  689.  //初始化主线程
  690. void thread_init(int nthreads, struct event_base *main_base) {
  691.     int i;
  692.     int power;
  693.     pthread_mutex_init(&cache_lock, NULL);
  694.     pthread_mutex_init(&stats_lock, NULL);
  695.     pthread_mutex_init(&init_lock, NULL);
  696.     pthread_cond_init(&init_cond, NULL);
  697.     pthread_mutex_init(&cqi_freelist_lock, NULL);
  698.     cqi_freelist = NULL;
  699.     /* Want a wide lock table, but don‘t waste memory */
  700.     /**
  701.     初始化item lock
  702.     */
  703.     //调配item锁的数量
  704.     //之所以需要锁是因为线程之间的并发,所以item锁的数量当然是根据线程的个数进行调配了。
  705.     if (nthreads < 3) {
  706.         power = 10; //这个power是指数
  707.     } else if (nthreads < 4) {
  708.         power = 11;
  709.     } else if (nthreads < 5) {
  710.         power = 12;
  711.     } else {
  712.         /* 8192 buckets, and central locks don‘t scale much past 5 threads */
  713.         power = 13;
  714.     }
  715.     item_lock_count = hashsize(power);
  716.     item_lock_hashpower = power;
  717.     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
  718.     if (! item_locks) {
  719.         perror("Can‘t allocate item locks");
  720.         exit(1);
  721.     }
  722.     for (i = 0; i < item_lock_count; i++) {
  723.         pthread_mutex_init(&item_locks[i], NULL);
  724.     }
  725.     pthread_key_create(&item_lock_type_key, NULL);
  726.     pthread_mutex_init(&item_global_lock, NULL);
  727.     //_mark2_1
  728.     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); //创建worker线程对象
  729.     if (! threads) {
  730.         perror("Can‘t allocate thread descriptors");
  731.         exit(1);
  732.     }
  733.     //_mark2_3
  734.     dispatcher_thread.base = main_base; //设置主线程对象的event_base
  735.     dispatcher_thread.thread_id = pthread_self(); //设置主线程对象pid
  736.     //_mark2_5
  737.     for (i = 0; i < nthreads; i++) { //为每个worker线程创建与主线程通信的管道
  738.         int fds[2];
  739.         if (pipe(fds)) {
  740.             perror("Can‘t create notify pipe");
  741.             exit(1);
  742.         }
  743.         threads[i].notify_receive_fd = fds[0]; //worker线程管道接收fd
  744.         threads[i].notify_send_fd = fds[1]; //worker线程管道写入fd
  745.         //_mark2_6
  746.         setup_thread(&threads[i]); //装载 worker线程
  747.         /* Reserve three fds for the libevent base, and two for the pipe */
  748.         stats.reserved_fds += 5;
  749.     }
  750.     /* Create threads after we‘ve done all the libevent setup. */
  751.     for (i = 0; i < nthreads; i++) {
  752.         //_mark2_7
  753.         create_worker(worker_libevent, &threads[i]); //启动worker线程,见worker_libevent
  754.     }
  755.     /* Wait for all the threads to set themselves up before returning. */
  756.     pthread_mutex_lock(&init_lock);
  757.     wait_for_thread_registration(nthreads); //等待所有worker线程启动完毕
  758.     pthread_mutex_unlock(&init_lock);
  759. }

Memcached源码分析之thread.c