首页 > 代码库 > 经典的线程池--用户空间与内核空间实现的对比

经典的线程池--用户空间与内核空间实现的对比

  经典的线程池模型是一组线程抢一个资源链表的模型,程序启动了一组线程,让它们等待信号waitQ的到来。同时又初始化一个资源链表,当某个线程往资源链表中添加一个资源时,它同时使用信号通知线程池。线程池中的线程接到信号后,就从资源链表中取出资源进行处理。

  接下来,我们先来观察一下用户空间线程池的创建过程吧!

 1 int 2 init (xlator_t *this) 3 { 4         iot_conf_t *conf = NULL; 5         int         ret  = -1; 6         int         i    = 0; 7  8     if (!this->children || this->children->next) { 9         gf_log ("io-threads", GF_LOG_ERROR,10             "FATAL: iot not configured with exactly one child");11                 goto out;12     }13 14     if (!this->parents) {15         gf_log (this->name, GF_LOG_WARNING,16             "dangling volume. check volfile ");17     }18 19     conf = (void *) GF_CALLOC (1, sizeof (*conf),20                                    gf_iot_mt_iot_conf_t);21         if (conf == NULL) {22                 gf_log (this->name, GF_LOG_ERROR,23                         "out of memory");24                 goto out;25         }26 27         if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) {28                 gf_log (this->name, GF_LOG_ERROR,29                         "pthread_cond_init failed (%d)", ret);30                 goto out;31         }32 33         if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) {34                 gf_log (this->name, GF_LOG_ERROR,35                         "pthread_mutex_init failed (%d)", ret);36                 goto out;37         }38 39         set_stack_size (conf);40 41         GF_OPTION_INIT ("thread-count", conf->max_count, int32, out);42 43         GF_OPTION_INIT ("high-prio-threads",44                         conf->ac_iot_limit[IOT_PRI_HI], int32, out);45 46         GF_OPTION_INIT ("normal-prio-threads",47                         conf->ac_iot_limit[IOT_PRI_NORMAL], int32, out);48 49         GF_OPTION_INIT ("low-prio-threads",50                         conf->ac_iot_limit[IOT_PRI_LO], int32, out);51 52         GF_OPTION_INIT ("least-prio-threads",53                         conf->ac_iot_limit[IOT_PRI_LEAST], int32, out);54 55         GF_OPTION_INIT ("idle-time", conf->idle_time, int32, out);56         GF_OPTION_INIT ("enable-least-priority", conf->least_priority,57                         bool, out);58 59     GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32,60                out);61         if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) {62                 gf_log (this->name, GF_LOG_ERROR,63                         "pthread_mutex_init failed (%d)", ret);64                 goto out;65         }66 67         conf->this = this;68 69         for (i = 0; i < IOT_PRI_MAX; i++) {70                 INIT_LIST_HEAD (&conf->reqs[i]);71         }72 73     ret = iot_workers_scale (conf);74 75         if (ret == -1) {76                 gf_log (this->name, GF_LOG_ERROR,77                         "cannot initialize worker threads, exiting init");78                 goto out;79         }80 81     this->private = conf;82         ret = 0;83 out:84         if (ret)85                 GF_FREE (conf);86 87     return ret;88 }

这是glusterfs中的io-threads xlator中的初始化部分,其中包含了线程的栈空间的初始化set_stack_size,启动的线程数量参数设定。通过调用iot_workers_scale启动线程。

 

 1 int 2 iot_workers_scale (iot_conf_t *conf) 3 { 4         int     ret = -1; 5  6         if (conf == NULL) { 7                 ret = -EINVAL; 8                 goto out; 9         }10 11         pthread_mutex_lock (&conf->mutex);12         {13                 ret = __iot_workers_scale (conf);14         }15         pthread_mutex_unlock (&conf->mutex);16 17 out:18         return ret;19 }

 

iot_workers_scale先加锁,再调用__iot_workers_scale创建线程。

 1 int 2 __iot_workers_scale (iot_conf_t *conf) 3 { 4         int       scale = 0; 5         int       diff = 0; 6         pthread_t thread; 7         int       ret = 0; 8         int       i = 0; 9 10         for (i = 0; i < IOT_PRI_MAX; i++)11                 scale += min (conf->queue_sizes[i], conf->ac_iot_limit[i]);12 13         if (scale < IOT_MIN_THREADS)14                 scale = IOT_MIN_THREADS;15 16         if (scale > conf->max_count)17                 scale = conf->max_count;18 19         if (conf->curr_count < scale) {20                 diff = scale - conf->curr_count;21         }22 23         while (diff) {24                 diff --;25 26                 ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf);27                 if (ret == 0) {28                         conf->curr_count++;29                         gf_log (conf->this->name, GF_LOG_DEBUG,30                                 "scaled threads to %d (queue_size=%d/%d)",31                                 conf->curr_count, conf->queue_size, scale);32                 } else {33                         break;34                 }35         }36 37         return diff;38 }

就这样,上面的一个while循环创建了所有线程,线程函数为iot_worker。接下来看看每个线程里面都是怎么工作的呢?

 1 void * 2 iot_worker (void *data) 3 { 4         iot_conf_t       *conf = NULL; 5         xlator_t         *this = NULL; 6         call_stub_t      *stub = NULL; 7         struct timespec   sleep_till = {0, }; 8         int               ret = 0; 9         int               pri = -1;10         char              timeout = 0;11         char              bye = 0;12     struct timespec      sleep = {0,};13 14         conf = data;15         this = conf->this;16         THIS = this;17 18         for (;;) {19                 sleep_till.tv_sec = time (NULL) + conf->idle_time;20 21                 pthread_mutex_lock (&conf->mutex);22                 {23                         if (pri != -1) {24                                 conf->ac_iot_count[pri]--;25                                 pri = -1;26                         }27                         while (conf->queue_size == 0) {28                                 conf->sleep_count++;29 30                                 ret = pthread_cond_timedwait (&conf->cond,31                                                               &conf->mutex,32                                                               &sleep_till);33                                 conf->sleep_count--;34 35                                 if (ret == ETIMEDOUT) {36                                         timeout = 1;37                                         break;38                                 }39                         }40 41                         if (timeout) {42                                 if (conf->curr_count > IOT_MIN_THREADS) {43                                         conf->curr_count--;44                                         bye = 1;45                                         gf_log (conf->this->name, GF_LOG_DEBUG,46                                                 "timeout, terminated. conf->curr_count=%d",47                                                 conf->curr_count);48                                 } else {49                                         timeout = 0;50                                 }51                         }52 53                         stub = __iot_dequeue (conf, &pri, &sleep);54             if (!stub && (sleep.tv_sec || sleep.tv_nsec)) {55                 pthread_cond_timedwait(&conf->cond,56                                &conf->mutex, &sleep);57                 pthread_mutex_unlock(&conf->mutex);58                 continue;59             }60                 }61                 pthread_mutex_unlock (&conf->mutex);62 63                 if (stub) /* guard against spurious wakeups */64                         call_resume (stub);65 66                 if (bye)67                         break;68         }69 70         if (pri != -1) {71                 pthread_mutex_lock (&conf->mutex);72                 {73                         conf->ac_iot_count[pri]--;74                 }75                 pthread_mutex_unlock (&conf->mutex);76         }77         return NULL;78 }

从上面代码30行可以看出,每个线程都在睡眠等待conf->cond条件变量,conf->mutex互斥锁会导致线程睡眠,而pthread_cond_timedwait接口是采用超时等待的机制。这里加入了超时多次后无任务时线程退出的机制。

线程通过从调用__iot_dequeue从资源链表中取出一个资源进行处理,call_resume是线程的处理方式。

接下来是资源添加的工作了:

glusterfs中的每个系统调用到了io-thread层都会转换成一个资源,每个资源都将交由线程池来处理。下面是open操作产生资源的处理过程。

 

 1 int 2 iot_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, 3           fd_t *fd, dict_t *xdata) 4 { 5         call_stub_t    *stub = NULL; 6         int             ret = -1; 7  8         stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd, 9                               xdata);10         if (!stub) {11                 gf_log (this->name, GF_LOG_ERROR,12                         "cannot create open call stub"13                         "(out of memory)");14                 ret = -ENOMEM;15                 goto out;16         }17 18     ret = iot_schedule (frame, this, stub);19 20 out:21         if (ret < 0) {22                 STACK_UNWIND_STRICT (open, frame, -1, -ret, NULL, NULL);23 24                 if (stub != NULL) {25                         call_stub_destroy (stub);26                 }27         }28 29     return 0;30 }

 

先由fop_open_stub产生一个stub资源,通过调用iot_schedule()加入到资源链表中。

 1 int 2 iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub) 3 { 4         int             ret = -1; 5         iot_pri_t       pri = IOT_PRI_MAX - 1; 6         iot_conf_t      *conf = this->private; 7  8         if ((frame->root->pid < GF_CLIENT_PID_MAX) && conf->least_priority) { 9                 pri = IOT_PRI_LEAST;10                 goto out;11         }12 13         switch (stub->fop) {14         case GF_FOP_OPEN:15         case GF_FOP_STAT:16         case GF_FOP_FSTAT:17         case GF_FOP_LOOKUP:18         case GF_FOP_ACCESS:19         case GF_FOP_READLINK:20         case GF_FOP_OPENDIR:21         case GF_FOP_STATFS:22         case GF_FOP_READDIR:23         case GF_FOP_READDIRP:24                 pri = IOT_PRI_HI;25                 break;26 27         case GF_FOP_CREATE:28         case GF_FOP_FLUSH:29         case GF_FOP_LK:30         case GF_FOP_INODELK:31         case GF_FOP_FINODELK:32         case GF_FOP_ENTRYLK:33         case GF_FOP_FENTRYLK:34         case GF_FOP_UNLINK:35         case GF_FOP_SETATTR:36         case GF_FOP_FSETATTR:37         case GF_FOP_MKNOD:38         case GF_FOP_MKDIR:39         case GF_FOP_RMDIR:40         case GF_FOP_SYMLINK:41         case GF_FOP_RENAME:42         case GF_FOP_LINK:43         case GF_FOP_SETXATTR:44         case GF_FOP_GETXATTR:45         case GF_FOP_FGETXATTR:46         case GF_FOP_FSETXATTR:47         case GF_FOP_REMOVEXATTR:48         case GF_FOP_FREMOVEXATTR:49                 pri = IOT_PRI_NORMAL;50                 break;51 52         case GF_FOP_READ:53         case GF_FOP_WRITE:54         case GF_FOP_FSYNC:55         case GF_FOP_TRUNCATE:56         case GF_FOP_FTRUNCATE:57         case GF_FOP_FSYNCDIR:58         case GF_FOP_XATTROP:59         case GF_FOP_FXATTROP:60         case GF_FOP_RCHECKSUM:61                 pri = IOT_PRI_LO;62                 break;63 64         case GF_FOP_NULL:65         case GF_FOP_FORGET:66         case GF_FOP_RELEASE:67         case GF_FOP_RELEASEDIR:68         case GF_FOP_GETSPEC:69         case GF_FOP_MAXVALUE:70                 //fail compilation on missing fop71                 //new fop must choose priority.72                 break;73         }74 out:75         gf_log (this->name, GF_LOG_DEBUG, "%s scheduled as %s fop",76                 gf_fop_list[stub->fop], iot_get_pri_meaning (pri));77         ret = do_iot_schedule (this->private, stub, pri);78         return ret;79 }

上面对不同的操作接口分配给不同级别的资源链表。这里给资源分级是glusterfs业务处理所需要的。

 1 int 2 do_iot_schedule (iot_conf_t *conf, call_stub_t *stub, int pri) 3 { 4         int   ret = 0; 5  6         pthread_mutex_lock (&conf->mutex); 7         { 8                 __iot_enqueue (conf, stub, pri); 9 10                 pthread_cond_signal (&conf->cond);11 12                 ret = __iot_workers_scale (conf);13         }14         pthread_mutex_unlock (&conf->mutex);15 16         return ret;17 }

do_iot_schedule是添加资源的核心操作,它调用__iot_enqueue添加一个资源,通过发送信号通知线程过来取资源。由于ptrhead_cond_signal最多只能通知一次,并且只能通知一个线程,所有这里有类似随机取一个线程来处理的性质。由于使用了动态线程池处理的方式,调用__iot_workers_scale函数可以动态增加线程池的数量。

 

下面是内核中线程池的处理方式:

这里介绍LINUX SAN存储scst模块中的线程池处理的特点。

编写内核模块时,在创建线程池时,线程池中的线程数量一般是由CPU的核数来决定的。如下所示:

    if (scst_threads == 0)        scst_threads = scst_num_cpus;    if (scst_threads < 1) {        PRINT_ERROR("%s", "scst_threads can not be less than 1");        scst_threads = scst_num_cpus;    }

scst模块中调用scst_start_global_threads来创建线程:

 1 static int scst_start_global_threads(int num) 2 { 3     int res; 4  5     TRACE_ENTRY(); 6  7     mutex_lock(&scst_mutex); 8  9     res = scst_add_threads(&scst_main_cmd_threads, NULL, NULL, num);10     if (res < 0)11         goto out_unlock;12 13     scst_init_cmd_thread = kthread_run(scst_init_thread,14         NULL, "scst_initd");15     if (IS_ERR(scst_init_cmd_thread)) {16         res = PTR_ERR(scst_init_cmd_thread);17         PRINT_ERROR("kthread_create() for init cmd failed: %d", res);18         scst_init_cmd_thread = NULL;19         goto out_unlock;20     }21 22     scst_mgmt_cmd_thread = kthread_run(scst_tm_thread,23         NULL, "scsi_tm");24     if (IS_ERR(scst_mgmt_cmd_thread)) {25         res = PTR_ERR(scst_mgmt_cmd_thread);26         PRINT_ERROR("kthread_create() for TM failed: %d", res);27         scst_mgmt_cmd_thread = NULL;28         goto out_unlock;29     }30 31     scst_mgmt_thread = kthread_run(scst_global_mgmt_thread,32         NULL, "scst_mgmtd");33     if (IS_ERR(scst_mgmt_thread)) {34         res = PTR_ERR(scst_mgmt_thread);35         PRINT_ERROR("kthread_create() for mgmt failed: %d", res);36         scst_mgmt_thread = NULL;37         goto out_unlock;38     }39 40 out_unlock:41     mutex_unlock(&scst_mutex);42 43     TRACE_EXIT_RES(res);44     return res;45 }

第9行是创建线程池处理的地方。

 1 int scst_add_threads(struct scst_cmd_threads *cmd_threads, 2     struct scst_device *dev, struct scst_tgt_dev *tgt_dev, int num) 3 { 4     int res = 0, i; 5     struct scst_cmd_thread_t *thr; 6     int n = 0, tgt_dev_num = 0; 7  8     TRACE_ENTRY(); 9 10     if (num == 0) {11         res = 0;12         goto out;13     }14 15     list_for_each_entry(thr, &cmd_threads->threads_list, thread_list_entry) {16         n++;17     }18 19     TRACE_DBG("cmd_threads %p, dev %p, tgt_dev %p, num %d, n %d",20         cmd_threads, dev, tgt_dev, num, n);21 22     if (tgt_dev != NULL) {23         struct scst_tgt_dev *t;24         list_for_each_entry(t, &tgt_dev->dev->dev_tgt_dev_list,25                 dev_tgt_dev_list_entry) {26             if (t == tgt_dev)27                 break;28             tgt_dev_num++;29         }30     }31 32     for (i = 0; i < num; i++) {33         thr = kmalloc(sizeof(*thr), GFP_KERNEL);34         if (!thr) {35             res = -ENOMEM;36             PRINT_ERROR("Fail to allocate thr %d", res);37             goto out_wait;38         }39 40         if (dev != NULL) {41             char nm[14]; /* to limit the name‘s len */42             strlcpy(nm, dev->virt_name, ARRAY_SIZE(nm));43             thr->cmd_thread = kthread_create(scst_cmd_thread,44                 cmd_threads, "%s%d", nm, n++);45         } else if (tgt_dev != NULL) {46             char nm[11]; /* to limit the name‘s len */47             strlcpy(nm, tgt_dev->dev->virt_name, ARRAY_SIZE(nm));48             thr->cmd_thread = kthread_create(scst_cmd_thread,49                 cmd_threads, "%s%d_%d", nm, tgt_dev_num, n++);50         } else51             thr->cmd_thread = kthread_create(scst_cmd_thread,52                 cmd_threads, "scstd%d", n++);53 54         if (IS_ERR(thr->cmd_thread)) {55             res = PTR_ERR(thr->cmd_thread);56             PRINT_ERROR("kthread_create() failed: %d", res);57             kfree(thr);58             goto out_wait;59         }60 61         list_add(&thr->thread_list_entry, &cmd_threads->threads_list);62         cmd_threads->nr_threads++;63 64         TRACE_DBG("Added thr %p to threads list (nr_threads %d, n %d)",65             thr, cmd_threads->nr_threads, n);66 67         wake_up_process(thr->cmd_thread);68     }69 70 out_wait:71     if (i > 0 && cmd_threads != &scst_main_cmd_threads) {72         /*73          * Wait for io_context gets initialized to avoid possible races74          * for it from the sharing it tgt_devs.75          */76         while (!*(volatile bool*)&cmd_threads->io_context_ready) {77             TRACE_DBG("Waiting for io_context for cmd_threads %p "78                 "initialized", cmd_threads);79             msleep(50);80         }81     }82 83     if (res != 0)84         scst_del_threads(cmd_threads, i);85 86 out:87     TRACE_EXIT_RES(res);88     return res;89 }

内核中创建线程的方法与用户空间稍有不同,它先调用kthread_create创建一个内核线程数据结构,再调用wake_up_process来唤醒线程。当然,也可以像pthread_create一样直接调用kthread_run来创建并执行线程。线程的执行函数为scst_cmd_thread。

 

 1 int scst_cmd_thread(void *arg) 2 { 3     struct scst_cmd_threads *p_cmd_threads = arg; 4  5     TRACE_ENTRY(); 6  7     PRINT_INFO("Processing thread %s (PID %d) started", current->comm, 8         current->pid); 9 10 #if 011     set_user_nice(current, 10);12 #endif13     current->flags |= PF_NOFREEZE;14 15 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)16     mutex_lock(&p_cmd_threads->io_context_mutex);17 18     WARN_ON(current->io_context);19 20     if (p_cmd_threads != &scst_main_cmd_threads) {21         /*22          * For linked IO contexts io_context might be not NULL while23          * io_context 0.24          */25         if (p_cmd_threads->io_context == NULL) {26             p_cmd_threads->io_context = get_io_context(GFP_KERNEL, -1);27             TRACE_MGMT_DBG("Alloced new IO context %p "28                 "(p_cmd_threads %p)",29                 p_cmd_threads->io_context,30                 p_cmd_threads);31             /*32              * Put the extra reference created by get_io_context()33              * because we don‘t need it.34              */35             put_io_context(p_cmd_threads->io_context);36         } else {37             current->io_context = ioc_task_link(p_cmd_threads->io_context);38             TRACE_MGMT_DBG("Linked IO context %p "39                 "(p_cmd_threads %p)", p_cmd_threads->io_context,40                 p_cmd_threads);41         }42         p_cmd_threads->io_context_refcnt++;43     }44 45     mutex_unlock(&p_cmd_threads->io_context_mutex);46 #endif47 48     p_cmd_threads->io_context_ready = true;49 50     spin_lock_irq(&p_cmd_threads->cmd_list_lock);51     while (!kthread_should_stop()) {52         wait_queue_t wait;53         init_waitqueue_entry(&wait, current);54 55         if (!test_cmd_threads(p_cmd_threads)) {56             add_wait_queue_exclusive_head(57                 &p_cmd_threads->cmd_list_waitQ,58                 &wait);59             for (;;) {60                 set_current_state(TASK_INTERRUPTIBLE);61                 if (test_cmd_threads(p_cmd_threads))62                     break;63                 spin_unlock_irq(&p_cmd_threads->cmd_list_lock);64                 schedule();65                 spin_lock_irq(&p_cmd_threads->cmd_list_lock);66             }67             set_current_state(TASK_RUNNING);68             remove_wait_queue(&p_cmd_threads->cmd_list_waitQ, &wait);69         }70 71         if (tm_dbg_is_release()) {72             spin_unlock_irq(&p_cmd_threads->cmd_list_lock);73             tm_dbg_check_released_cmds();74             spin_lock_irq(&p_cmd_threads->cmd_list_lock);75         }76 77         scst_do_job_active(&p_cmd_threads->active_cmd_list,78             &p_cmd_threads->cmd_list_lock, false);79     }80     spin_unlock_irq(&p_cmd_threads->cmd_list_lock);81 82 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)83     if (p_cmd_threads != &scst_main_cmd_threads) {84         mutex_lock(&p_cmd_threads->io_context_mutex);85         if (--p_cmd_threads->io_context_refcnt == 0)86             p_cmd_threads->io_context = NULL;87         mutex_unlock(&p_cmd_threads->io_context_mutex);88     }89 #endif90 91     PRINT_INFO("Processing thread %s (PID %d) finished", current->comm,92         current->pid);93 94     TRACE_EXIT();95     return 0;96 }

 

内核中要处理的东西比较多,如线程上下文,中断上下文,以及要考虑一些地方不能休眠的问题,代码写起来比较复杂。这里主要看的是50-80行,50行对资源链表加锁,51行检测是否停止线程,52行定义一个wait_queue_t类型的变量wait,这个结构相当于用户空间的线程信号量,53行对这个信号量进行了初始化,初始化的工作是与本线程绑定并注册通知函数(通知函数是使内核调度策略调度唤醒本线程),56-58行把信号量加入到资源唤醒信号量队列中。60行打开中断,63行解锁资源,64行将CPU让出来,等待资源到来通知本线程时才会被唤醒,唤醒后会执行77行的函数。

再看一下这个线程是如何被唤醒的呢?

 1         spin_lock(&cmd->cmd_threads->cmd_list_lock); 2         TRACE_MGMT_DBG("Adding cmd %p to active cmd list", cmd); 3         if (unlikely(cmd->queue_type == SCST_CMD_QUEUE_HEAD_OF_QUEUE)) 4             list_add(&cmd->cmd_list_entry, 5                 &cmd->cmd_threads->active_cmd_list); 6         else 7             list_add_tail(&cmd->cmd_list_entry, 8                 &cmd->cmd_threads->active_cmd_list); 9         wake_up(&cmd->cmd_threads->cmd_list_waitQ);10         spin_unlock(&cmd->cmd_threads->cmd_list_lock);

此处只列出了相关和片段。第一行对资源加锁,第4-8行将资源加入链表,第9行唤醒工作线程去处理。第10行解锁资源。