首页 > 代码库 > 经典的线程池--用户空间与内核空间实现的对比
经典的线程池--用户空间与内核空间实现的对比
经典的线程池模型是一组线程抢一个资源链表的模型,程序启动了一组线程,让它们等待信号waitQ的到来。同时又初始化一个资源链表,当某个线程往资源链表中添加一个资源时,它同时使用信号通知线程池。线程池中的线程接到信号后,就从资源链表中取出资源进行处理。
接下来,我们先来观察一下用户空间线程池的创建过程吧!
1 int 2 init (xlator_t *this) 3 { 4 iot_conf_t *conf = NULL; 5 int ret = -1; 6 int i = 0; 7 8 if (!this->children || this->children->next) { 9 gf_log ("io-threads", GF_LOG_ERROR,10 "FATAL: iot not configured with exactly one child");11 goto out;12 }13 14 if (!this->parents) {15 gf_log (this->name, GF_LOG_WARNING,16 "dangling volume. check volfile ");17 }18 19 conf = (void *) GF_CALLOC (1, sizeof (*conf),20 gf_iot_mt_iot_conf_t);21 if (conf == NULL) {22 gf_log (this->name, GF_LOG_ERROR,23 "out of memory");24 goto out;25 }26 27 if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) {28 gf_log (this->name, GF_LOG_ERROR,29 "pthread_cond_init failed (%d)", ret);30 goto out;31 }32 33 if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) {34 gf_log (this->name, GF_LOG_ERROR,35 "pthread_mutex_init failed (%d)", ret);36 goto out;37 }38 39 set_stack_size (conf);40 41 GF_OPTION_INIT ("thread-count", conf->max_count, int32, out);42 43 GF_OPTION_INIT ("high-prio-threads",44 conf->ac_iot_limit[IOT_PRI_HI], int32, out);45 46 GF_OPTION_INIT ("normal-prio-threads",47 conf->ac_iot_limit[IOT_PRI_NORMAL], int32, out);48 49 GF_OPTION_INIT ("low-prio-threads",50 conf->ac_iot_limit[IOT_PRI_LO], int32, out);51 52 GF_OPTION_INIT ("least-prio-threads",53 conf->ac_iot_limit[IOT_PRI_LEAST], int32, out);54 55 GF_OPTION_INIT ("idle-time", conf->idle_time, int32, out);56 GF_OPTION_INIT ("enable-least-priority", conf->least_priority,57 bool, out);58 59 GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32,60 out);61 if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) {62 gf_log (this->name, GF_LOG_ERROR,63 "pthread_mutex_init failed (%d)", ret);64 goto out;65 }66 67 conf->this = this;68 69 for (i = 0; i < IOT_PRI_MAX; i++) {70 INIT_LIST_HEAD (&conf->reqs[i]);71 }72 73 ret = iot_workers_scale (conf);74 75 if (ret == -1) {76 gf_log (this->name, GF_LOG_ERROR,77 "cannot initialize worker threads, exiting init");78 goto out;79 }80 81 this->private = conf;82 ret = 0;83 out:84 if (ret)85 GF_FREE (conf);86 87 return ret;88 }
这是glusterfs中的io-threads xlator中的初始化部分,其中包含了线程的栈空间的初始化set_stack_size,启动的线程数量参数设定。通过调用iot_workers_scale启动线程。
1 int 2 iot_workers_scale (iot_conf_t *conf) 3 { 4 int ret = -1; 5 6 if (conf == NULL) { 7 ret = -EINVAL; 8 goto out; 9 }10 11 pthread_mutex_lock (&conf->mutex);12 {13 ret = __iot_workers_scale (conf);14 }15 pthread_mutex_unlock (&conf->mutex);16 17 out:18 return ret;19 }
iot_workers_scale先加锁,再调用__iot_workers_scale创建线程。
1 int 2 __iot_workers_scale (iot_conf_t *conf) 3 { 4 int scale = 0; 5 int diff = 0; 6 pthread_t thread; 7 int ret = 0; 8 int i = 0; 9 10 for (i = 0; i < IOT_PRI_MAX; i++)11 scale += min (conf->queue_sizes[i], conf->ac_iot_limit[i]);12 13 if (scale < IOT_MIN_THREADS)14 scale = IOT_MIN_THREADS;15 16 if (scale > conf->max_count)17 scale = conf->max_count;18 19 if (conf->curr_count < scale) {20 diff = scale - conf->curr_count;21 }22 23 while (diff) {24 diff --;25 26 ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf);27 if (ret == 0) {28 conf->curr_count++;29 gf_log (conf->this->name, GF_LOG_DEBUG,30 "scaled threads to %d (queue_size=%d/%d)",31 conf->curr_count, conf->queue_size, scale);32 } else {33 break;34 }35 }36 37 return diff;38 }
就这样,上面的一个while循环创建了所有线程,线程函数为iot_worker。接下来看看每个线程里面都是怎么工作的呢?
1 void * 2 iot_worker (void *data) 3 { 4 iot_conf_t *conf = NULL; 5 xlator_t *this = NULL; 6 call_stub_t *stub = NULL; 7 struct timespec sleep_till = {0, }; 8 int ret = 0; 9 int pri = -1;10 char timeout = 0;11 char bye = 0;12 struct timespec sleep = {0,};13 14 conf = data;15 this = conf->this;16 THIS = this;17 18 for (;;) {19 sleep_till.tv_sec = time (NULL) + conf->idle_time;20 21 pthread_mutex_lock (&conf->mutex);22 {23 if (pri != -1) {24 conf->ac_iot_count[pri]--;25 pri = -1;26 }27 while (conf->queue_size == 0) {28 conf->sleep_count++;29 30 ret = pthread_cond_timedwait (&conf->cond,31 &conf->mutex,32 &sleep_till);33 conf->sleep_count--;34 35 if (ret == ETIMEDOUT) {36 timeout = 1;37 break;38 }39 }40 41 if (timeout) {42 if (conf->curr_count > IOT_MIN_THREADS) {43 conf->curr_count--;44 bye = 1;45 gf_log (conf->this->name, GF_LOG_DEBUG,46 "timeout, terminated. conf->curr_count=%d",47 conf->curr_count);48 } else {49 timeout = 0;50 }51 }52 53 stub = __iot_dequeue (conf, &pri, &sleep);54 if (!stub && (sleep.tv_sec || sleep.tv_nsec)) {55 pthread_cond_timedwait(&conf->cond,56 &conf->mutex, &sleep);57 pthread_mutex_unlock(&conf->mutex);58 continue;59 }60 }61 pthread_mutex_unlock (&conf->mutex);62 63 if (stub) /* guard against spurious wakeups */64 call_resume (stub);65 66 if (bye)67 break;68 }69 70 if (pri != -1) {71 pthread_mutex_lock (&conf->mutex);72 {73 conf->ac_iot_count[pri]--;74 }75 pthread_mutex_unlock (&conf->mutex);76 }77 return NULL;78 }
从上面代码30行可以看出,每个线程都在睡眠等待conf->cond条件变量,conf->mutex互斥锁会导致线程睡眠,而pthread_cond_timedwait接口是采用超时等待的机制。这里加入了超时多次后无任务时线程退出的机制。
线程通过从调用__iot_dequeue从资源链表中取出一个资源进行处理,call_resume是线程的处理方式。
接下来是资源添加的工作了:
glusterfs中的每个系统调用到了io-thread层都会转换成一个资源,每个资源都将交由线程池来处理。下面是open操作产生资源的处理过程。
1 int 2 iot_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, 3 fd_t *fd, dict_t *xdata) 4 { 5 call_stub_t *stub = NULL; 6 int ret = -1; 7 8 stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd, 9 xdata);10 if (!stub) {11 gf_log (this->name, GF_LOG_ERROR,12 "cannot create open call stub"13 "(out of memory)");14 ret = -ENOMEM;15 goto out;16 }17 18 ret = iot_schedule (frame, this, stub);19 20 out:21 if (ret < 0) {22 STACK_UNWIND_STRICT (open, frame, -1, -ret, NULL, NULL);23 24 if (stub != NULL) {25 call_stub_destroy (stub);26 }27 }28 29 return 0;30 }
先由fop_open_stub产生一个stub资源,通过调用iot_schedule()加入到资源链表中。
1 int 2 iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub) 3 { 4 int ret = -1; 5 iot_pri_t pri = IOT_PRI_MAX - 1; 6 iot_conf_t *conf = this->private; 7 8 if ((frame->root->pid < GF_CLIENT_PID_MAX) && conf->least_priority) { 9 pri = IOT_PRI_LEAST;10 goto out;11 }12 13 switch (stub->fop) {14 case GF_FOP_OPEN:15 case GF_FOP_STAT:16 case GF_FOP_FSTAT:17 case GF_FOP_LOOKUP:18 case GF_FOP_ACCESS:19 case GF_FOP_READLINK:20 case GF_FOP_OPENDIR:21 case GF_FOP_STATFS:22 case GF_FOP_READDIR:23 case GF_FOP_READDIRP:24 pri = IOT_PRI_HI;25 break;26 27 case GF_FOP_CREATE:28 case GF_FOP_FLUSH:29 case GF_FOP_LK:30 case GF_FOP_INODELK:31 case GF_FOP_FINODELK:32 case GF_FOP_ENTRYLK:33 case GF_FOP_FENTRYLK:34 case GF_FOP_UNLINK:35 case GF_FOP_SETATTR:36 case GF_FOP_FSETATTR:37 case GF_FOP_MKNOD:38 case GF_FOP_MKDIR:39 case GF_FOP_RMDIR:40 case GF_FOP_SYMLINK:41 case GF_FOP_RENAME:42 case GF_FOP_LINK:43 case GF_FOP_SETXATTR:44 case GF_FOP_GETXATTR:45 case GF_FOP_FGETXATTR:46 case GF_FOP_FSETXATTR:47 case GF_FOP_REMOVEXATTR:48 case GF_FOP_FREMOVEXATTR:49 pri = IOT_PRI_NORMAL;50 break;51 52 case GF_FOP_READ:53 case GF_FOP_WRITE:54 case GF_FOP_FSYNC:55 case GF_FOP_TRUNCATE:56 case GF_FOP_FTRUNCATE:57 case GF_FOP_FSYNCDIR:58 case GF_FOP_XATTROP:59 case GF_FOP_FXATTROP:60 case GF_FOP_RCHECKSUM:61 pri = IOT_PRI_LO;62 break;63 64 case GF_FOP_NULL:65 case GF_FOP_FORGET:66 case GF_FOP_RELEASE:67 case GF_FOP_RELEASEDIR:68 case GF_FOP_GETSPEC:69 case GF_FOP_MAXVALUE:70 //fail compilation on missing fop71 //new fop must choose priority.72 break;73 }74 out:75 gf_log (this->name, GF_LOG_DEBUG, "%s scheduled as %s fop",76 gf_fop_list[stub->fop], iot_get_pri_meaning (pri));77 ret = do_iot_schedule (this->private, stub, pri);78 return ret;79 }
上面对不同的操作接口分配给不同级别的资源链表。这里给资源分级是glusterfs业务处理所需要的。
1 int 2 do_iot_schedule (iot_conf_t *conf, call_stub_t *stub, int pri) 3 { 4 int ret = 0; 5 6 pthread_mutex_lock (&conf->mutex); 7 { 8 __iot_enqueue (conf, stub, pri); 9 10 pthread_cond_signal (&conf->cond);11 12 ret = __iot_workers_scale (conf);13 }14 pthread_mutex_unlock (&conf->mutex);15 16 return ret;17 }
do_iot_schedule是添加资源的核心操作,它调用__iot_enqueue添加一个资源,通过发送信号通知线程过来取资源。由于ptrhead_cond_signal最多只能通知一次,并且只能通知一个线程,所有这里有类似随机取一个线程来处理的性质。由于使用了动态线程池处理的方式,调用__iot_workers_scale函数可以动态增加线程池的数量。
下面是内核中线程池的处理方式:
这里介绍LINUX SAN存储scst模块中的线程池处理的特点。
编写内核模块时,在创建线程池时,线程池中的线程数量一般是由CPU的核数来决定的。如下所示:
if (scst_threads == 0) scst_threads = scst_num_cpus; if (scst_threads < 1) { PRINT_ERROR("%s", "scst_threads can not be less than 1"); scst_threads = scst_num_cpus; }
scst模块中调用scst_start_global_threads来创建线程:
1 static int scst_start_global_threads(int num) 2 { 3 int res; 4 5 TRACE_ENTRY(); 6 7 mutex_lock(&scst_mutex); 8 9 res = scst_add_threads(&scst_main_cmd_threads, NULL, NULL, num);10 if (res < 0)11 goto out_unlock;12 13 scst_init_cmd_thread = kthread_run(scst_init_thread,14 NULL, "scst_initd");15 if (IS_ERR(scst_init_cmd_thread)) {16 res = PTR_ERR(scst_init_cmd_thread);17 PRINT_ERROR("kthread_create() for init cmd failed: %d", res);18 scst_init_cmd_thread = NULL;19 goto out_unlock;20 }21 22 scst_mgmt_cmd_thread = kthread_run(scst_tm_thread,23 NULL, "scsi_tm");24 if (IS_ERR(scst_mgmt_cmd_thread)) {25 res = PTR_ERR(scst_mgmt_cmd_thread);26 PRINT_ERROR("kthread_create() for TM failed: %d", res);27 scst_mgmt_cmd_thread = NULL;28 goto out_unlock;29 }30 31 scst_mgmt_thread = kthread_run(scst_global_mgmt_thread,32 NULL, "scst_mgmtd");33 if (IS_ERR(scst_mgmt_thread)) {34 res = PTR_ERR(scst_mgmt_thread);35 PRINT_ERROR("kthread_create() for mgmt failed: %d", res);36 scst_mgmt_thread = NULL;37 goto out_unlock;38 }39 40 out_unlock:41 mutex_unlock(&scst_mutex);42 43 TRACE_EXIT_RES(res);44 return res;45 }
第9行是创建线程池处理的地方。
1 int scst_add_threads(struct scst_cmd_threads *cmd_threads, 2 struct scst_device *dev, struct scst_tgt_dev *tgt_dev, int num) 3 { 4 int res = 0, i; 5 struct scst_cmd_thread_t *thr; 6 int n = 0, tgt_dev_num = 0; 7 8 TRACE_ENTRY(); 9 10 if (num == 0) {11 res = 0;12 goto out;13 }14 15 list_for_each_entry(thr, &cmd_threads->threads_list, thread_list_entry) {16 n++;17 }18 19 TRACE_DBG("cmd_threads %p, dev %p, tgt_dev %p, num %d, n %d",20 cmd_threads, dev, tgt_dev, num, n);21 22 if (tgt_dev != NULL) {23 struct scst_tgt_dev *t;24 list_for_each_entry(t, &tgt_dev->dev->dev_tgt_dev_list,25 dev_tgt_dev_list_entry) {26 if (t == tgt_dev)27 break;28 tgt_dev_num++;29 }30 }31 32 for (i = 0; i < num; i++) {33 thr = kmalloc(sizeof(*thr), GFP_KERNEL);34 if (!thr) {35 res = -ENOMEM;36 PRINT_ERROR("Fail to allocate thr %d", res);37 goto out_wait;38 }39 40 if (dev != NULL) {41 char nm[14]; /* to limit the name‘s len */42 strlcpy(nm, dev->virt_name, ARRAY_SIZE(nm));43 thr->cmd_thread = kthread_create(scst_cmd_thread,44 cmd_threads, "%s%d", nm, n++);45 } else if (tgt_dev != NULL) {46 char nm[11]; /* to limit the name‘s len */47 strlcpy(nm, tgt_dev->dev->virt_name, ARRAY_SIZE(nm));48 thr->cmd_thread = kthread_create(scst_cmd_thread,49 cmd_threads, "%s%d_%d", nm, tgt_dev_num, n++);50 } else51 thr->cmd_thread = kthread_create(scst_cmd_thread,52 cmd_threads, "scstd%d", n++);53 54 if (IS_ERR(thr->cmd_thread)) {55 res = PTR_ERR(thr->cmd_thread);56 PRINT_ERROR("kthread_create() failed: %d", res);57 kfree(thr);58 goto out_wait;59 }60 61 list_add(&thr->thread_list_entry, &cmd_threads->threads_list);62 cmd_threads->nr_threads++;63 64 TRACE_DBG("Added thr %p to threads list (nr_threads %d, n %d)",65 thr, cmd_threads->nr_threads, n);66 67 wake_up_process(thr->cmd_thread);68 }69 70 out_wait:71 if (i > 0 && cmd_threads != &scst_main_cmd_threads) {72 /*73 * Wait for io_context gets initialized to avoid possible races74 * for it from the sharing it tgt_devs.75 */76 while (!*(volatile bool*)&cmd_threads->io_context_ready) {77 TRACE_DBG("Waiting for io_context for cmd_threads %p "78 "initialized", cmd_threads);79 msleep(50);80 }81 }82 83 if (res != 0)84 scst_del_threads(cmd_threads, i);85 86 out:87 TRACE_EXIT_RES(res);88 return res;89 }
内核中创建线程的方法与用户空间稍有不同,它先调用kthread_create创建一个内核线程数据结构,再调用wake_up_process来唤醒线程。当然,也可以像pthread_create一样直接调用kthread_run来创建并执行线程。线程的执行函数为scst_cmd_thread。
1 int scst_cmd_thread(void *arg) 2 { 3 struct scst_cmd_threads *p_cmd_threads = arg; 4 5 TRACE_ENTRY(); 6 7 PRINT_INFO("Processing thread %s (PID %d) started", current->comm, 8 current->pid); 9 10 #if 011 set_user_nice(current, 10);12 #endif13 current->flags |= PF_NOFREEZE;14 15 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)16 mutex_lock(&p_cmd_threads->io_context_mutex);17 18 WARN_ON(current->io_context);19 20 if (p_cmd_threads != &scst_main_cmd_threads) {21 /*22 * For linked IO contexts io_context might be not NULL while23 * io_context 0.24 */25 if (p_cmd_threads->io_context == NULL) {26 p_cmd_threads->io_context = get_io_context(GFP_KERNEL, -1);27 TRACE_MGMT_DBG("Alloced new IO context %p "28 "(p_cmd_threads %p)",29 p_cmd_threads->io_context,30 p_cmd_threads);31 /*32 * Put the extra reference created by get_io_context()33 * because we don‘t need it.34 */35 put_io_context(p_cmd_threads->io_context);36 } else {37 current->io_context = ioc_task_link(p_cmd_threads->io_context);38 TRACE_MGMT_DBG("Linked IO context %p "39 "(p_cmd_threads %p)", p_cmd_threads->io_context,40 p_cmd_threads);41 }42 p_cmd_threads->io_context_refcnt++;43 }44 45 mutex_unlock(&p_cmd_threads->io_context_mutex);46 #endif47 48 p_cmd_threads->io_context_ready = true;49 50 spin_lock_irq(&p_cmd_threads->cmd_list_lock);51 while (!kthread_should_stop()) {52 wait_queue_t wait;53 init_waitqueue_entry(&wait, current);54 55 if (!test_cmd_threads(p_cmd_threads)) {56 add_wait_queue_exclusive_head(57 &p_cmd_threads->cmd_list_waitQ,58 &wait);59 for (;;) {60 set_current_state(TASK_INTERRUPTIBLE);61 if (test_cmd_threads(p_cmd_threads))62 break;63 spin_unlock_irq(&p_cmd_threads->cmd_list_lock);64 schedule();65 spin_lock_irq(&p_cmd_threads->cmd_list_lock);66 }67 set_current_state(TASK_RUNNING);68 remove_wait_queue(&p_cmd_threads->cmd_list_waitQ, &wait);69 }70 71 if (tm_dbg_is_release()) {72 spin_unlock_irq(&p_cmd_threads->cmd_list_lock);73 tm_dbg_check_released_cmds();74 spin_lock_irq(&p_cmd_threads->cmd_list_lock);75 }76 77 scst_do_job_active(&p_cmd_threads->active_cmd_list,78 &p_cmd_threads->cmd_list_lock, false);79 }80 spin_unlock_irq(&p_cmd_threads->cmd_list_lock);81 82 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)83 if (p_cmd_threads != &scst_main_cmd_threads) {84 mutex_lock(&p_cmd_threads->io_context_mutex);85 if (--p_cmd_threads->io_context_refcnt == 0)86 p_cmd_threads->io_context = NULL;87 mutex_unlock(&p_cmd_threads->io_context_mutex);88 }89 #endif90 91 PRINT_INFO("Processing thread %s (PID %d) finished", current->comm,92 current->pid);93 94 TRACE_EXIT();95 return 0;96 }
内核中要处理的东西比较多,如线程上下文,中断上下文,以及要考虑一些地方不能休眠的问题,代码写起来比较复杂。这里主要看的是50-80行,50行对资源链表加锁,51行检测是否停止线程,52行定义一个wait_queue_t类型的变量wait,这个结构相当于用户空间的线程信号量,53行对这个信号量进行了初始化,初始化的工作是与本线程绑定并注册通知函数(通知函数是使内核调度策略调度唤醒本线程),56-58行把信号量加入到资源唤醒信号量队列中。60行打开中断,63行解锁资源,64行将CPU让出来,等待资源到来通知本线程时才会被唤醒,唤醒后会执行77行的函数。
再看一下这个线程是如何被唤醒的呢?
1 spin_lock(&cmd->cmd_threads->cmd_list_lock); 2 TRACE_MGMT_DBG("Adding cmd %p to active cmd list", cmd); 3 if (unlikely(cmd->queue_type == SCST_CMD_QUEUE_HEAD_OF_QUEUE)) 4 list_add(&cmd->cmd_list_entry, 5 &cmd->cmd_threads->active_cmd_list); 6 else 7 list_add_tail(&cmd->cmd_list_entry, 8 &cmd->cmd_threads->active_cmd_list); 9 wake_up(&cmd->cmd_threads->cmd_list_waitQ);10 spin_unlock(&cmd->cmd_threads->cmd_list_lock);
此处只列出了相关和片段。第一行对资源加锁,第4-8行将资源加入链表,第9行唤醒工作线程去处理。第10行解锁资源。