首页 > 代码库 > 根据相关资料,自己实现的线程池

根据相关资料,自己实现的线程池

  1 #include <stdio.h>  2 #include <stdlib.h>  3 #include <unistd.h>  4 #include <pthread.h>  5 #include <sys/types.h>  6   7 typedef struct CThread_worker  8 {  9     void *(*process)(void *arg); 10     void *arg; 11     struct CThread_worker *next; 12 }CThread_worker; 13  14  15 typedef struct  16 { 17     pthread_mutex_t queue_head; 18     pthread_cond_t queue_ready; 19  20     struct CThread_worker *queue_worker; 21     pthread_t *pthread_id; 22  23     int max_task_num; 24     int cur_task_num; 25     int shutdown; 26  27 }CThread_pool; 28  29 static CThread_pool *pool = NULL; 30  31 void pool_init(int); 32 void add_task_to_pool(void *(*)(void *), void *); 33 void *pthread_fun(void *); 34 void pool_destroy(); 35 void *my_process(void *); 36  37 int main(int argc, char *argv[]) 38 { 39     int max_task_num = 0; 40     int i = 0; 41     max_task_num = 5; 42  43     pool_init(max_task_num); 44       45     int *worker_num; 46     worker_num = (int *)malloc(10 * sizeof(int)); 47     for (i=0; i<10; i++) 48     { 49         worker_num[i] = i; 50         add_task_to_pool(my_process, &worker_num[i]); 51     } 52      53     sleep(12); 54     pool_destroy(); 55      56     free(worker_num); 57     worker_num = NULL; 58  59     return 0; 60 } 61  62  63 void pool_init(int num) 64 { 65     int i = 0;     66     pool = (CThread_pool *)malloc(sizeof(CThread_pool)); 67      68     pthread_mutex_init(&(pool->queue_head), NULL); 69     pthread_cond_init(&(pool->queue_ready), NULL); 70  71     pool->queue_worker = NULL; 72     pool->max_task_num = num; 73     pool->cur_task_num = 0; 74     pool->shutdown = 0; 75     pool->pthread_id = (pthread_t *)malloc(num * sizeof(pthread_t)); 76  77     for (i=0; i<num; ++i) 78     { 79         pthread_create(&(pool->pthread_id[i]), NULL, pthread_fun, NULL); 80     } 81      82 } 83  84 void *pthread_fun(void *arg) 85 { 86     CThread_worker *worker = NULL; 87  88     printf("pthread %u is starting\n", (unsigned int)pthread_self()); 89     while (1) 90     { 91         pthread_mutex_lock(&(pool->queue_head)); 92         while (pool->cur_task_num == 0 && !pool->shutdown) 93         { 94             printf("pthread %u is waiting task...\n\n", (unsigned int)(pthread_self())); 95             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_head)); 96         } 97         if (pool->shutdown) 98         { 99             /*线程退出之前,必须解锁,以让其他的线程得以访问该共享资源.*/100             pthread_mutex_unlock(&(pool->queue_head));101             printf("pthread %u is exiting\n", (unsigned int)pthread_self());102             pthread_exit(NULL);103         }104 105         pool->cur_task_num--;106         107         worker = pool->queue_worker;108         pool->queue_worker = worker->next;109 /*110         while (worker->next != NULL)111         {112             worker = worker->next;113         }    114 */        115         pthread_mutex_unlock(&(pool->queue_head));116 117         (*(worker->process))(worker->arg);118     }119     pthread_exit(NULL);120 }121 122 void add_task_to_pool(void *(*my_process)(void *), void *arg)123 {124     CThread_worker *worker = NULL;125     CThread_worker *p = NULL;126     127     worker = (CThread_worker *)malloc(sizeof(CThread_worker));128     worker->process = my_process;129     worker->arg = arg;130     worker->next = NULL;131 132     pthread_mutex_lock(&(pool->queue_head));133 134     p = pool->queue_worker;135     if ( p == NULL)136     {137         pool->queue_worker = worker;138     }139     else140     {141         while (p->next != NULL)142         {143             p = p->next;144         }145         p->next = worker;146     }147 148     pool->cur_task_num++;149 150     pthread_mutex_unlock(&(pool->queue_head));151 152     pthread_cond_signal(&(pool->queue_ready));153 154 }155 156 157 void pool_destroy()158 {159     int i = 0;160     CThread_worker *p = NULL;161 162     pool->shutdown = 1;163     /*唤醒等待该条件的所有线程.否则线程将处于阻塞状态,等待条件满足.*/164     pthread_cond_broadcast(&(pool->queue_ready));    165     /*阻塞等待线程退出,否则成为僵尸线程.*/166     for (i=0; i<pool->max_task_num; ++i)167     {168         pthread_join(pool->pthread_id[i], NULL);169     }170     free (pool->pthread_id);171 172     while (pool->queue_worker != NULL)173     {174         p = pool->queue_worker;175         pool->queue_worker = p->next;176         free(p);177     }178     /*信号量和条件变量需要销毁.*/    179     pthread_mutex_destroy(&(pool->queue_head));180     pthread_cond_destroy(&(pool->queue_ready));181 182     free(pool);183     /*为避免pool成为野指针,将其赋值为空.*/184     pool = NULL;    185 }186 187 void *my_process(void *task_id)188 {189     printf("thread %u is doing task %d\n",(unsigned int)pthread_self(), *(int *)task_id);190     sleep(1);191 }

 

根据相关资料,自己实现的线程池