首页 > 代码库 > 线程池的简单实现
线程池的简单实现
几个基本的线程函数:
//线程操纵函数
//创建: int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);
//终止自身 void pthread_exit(void *retval);
//终止其他: int pthread_cancel(pthread_t tid); 发送终止信号后目标线程不一定终止,要调用join函数等待
//阻塞并等待其他线程:int pthread_join(pthread_t tid, void **retval);
//属性
//初始化: int pthread_attr_init(pthread_attr_t *attr);
//设置分离状态: pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
//销毁属性: int pthread_attr_destroy(pthread_attr_t *attr);
//同步函数
//互斥锁
//初始化: int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
//销毁锁: int pthread_mutex_destroy(pthread_mutex_t *mutex);
//加锁: pthread_mutex_lock(pthread_mutex_t *mutex);
//尝试加锁: int pthread_mutex_trylock(pthread_mutex_t *mutex);
//解锁: int pthread_mutex_unlock(pthread_mutex_t *mutex);
//条件变量
//初始化:int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr);
//销毁: int pthread_cond_destroy(pthread_cond_t *cond);
//等待条件: pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
//通知: pthread_cond_signal(pthread_cond_t *cond): 唤醒第一个调用pthread_cond_wait()而进入睡眠的线程
//工具函数
//比较线程ID: int pthread_equal(pthread_t t1, pthread_t t2);
//分离线程: pthread_detach(pthread_t tid);
//自身ID: pthread_t pthread_self(void);
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> //linux环境中多线程的头文件,非C语言标准库,编译时最后要加 -lpthread 调用动态链接库 4 5 //工作链表的结构 6 typedef struct worker { 7 void *(*process) (void *arg); //工作函数 8 void *arg; //函数的参数 9 struct worker *next; 10 }CThread_worker; 11 12 //线程池的结构 13 typedef struct { 14 pthread_mutex_t queue_lock; //互斥锁 15 pthread_cond_t queue_ready; //条件变量/信号量 16 17 CThread_worker *queue_head; //指向工作链表的头结点,临界区 18 int cur_queue_size; //记录链表中工作的数量,临界区 19 20 int max_thread_num; //最大线程数 21 pthread_t *threadid; //线程ID 22 23 int shutdown; //开关 24 }CThread_pool; 25 26 static CThread_pool *pool = NULL; //一个线程池变量 27 int pool_add_worker(void *(*process)(void *arg), void *arg); //负责向工作链表中添加工作 28 void *thread_routine(void *arg); //线程例程 29 30 //线程池初始化 31 void 32 pool_init(int max_thread_num) 33 { 34 int i = 0; 35 36 pool = (CThread_pool *) malloc (sizeof (CThread_pool)); //创建线程池 37 38 pthread_mutex_init(&(pool->queue_lock), NULL); //互斥锁初始化,参数为锁的地址 39 pthread_cond_init( &(pool->queue_ready), NULL); //条件变量初始化,参数为变量地址 40 41 pool->queue_head = NULL; 42 pool->cur_queue_size = 0; 43 44 pool->max_thread_num = max_thread_num; 45 pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t)); 46 for (i = 0; i < max_thread_num; i++) { 47 pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); //创建线程, 参数为线程ID变量地址、属性、例程、参数 48 } 49 50 pool->shutdown = 0; 51 } 52 53 //例程,调用具体的工作函数 54 void * 55 thread_routine(void *arg) 56 { 57 printf("starting thread 0x%x\n", (int)pthread_self()); 58 while(1) { 59 pthread_mutex_lock(&(pool->queue_lock)); //从工作链表中取工作,要先加互斥锁,参数为锁地址 60 61 while(pool->cur_queue_size == 0 && !pool->shutdown) { //链表为空 62 printf("thread 0x%x is waiting\n", (int)pthread_self()); 63 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); //等待资源,信号量用于通知。会释放第二个参数的锁,以供添加;函数返回时重新加锁。 64 } 65 66 if(pool->shutdown) { 67 pthread_mutex_unlock(&(pool->queue_lock)); //结束开关开启,释放锁并退出线程 68 printf("thread 0x%x will exit\n", (int)pthread_self()); 69 pthread_exit(NULL); //参数为void * 70 } 71 72 printf("thread 0x%x is starting to work\n", (int)pthread_self()); 73 74 --pool->cur_queue_size; 75 CThread_worker *worker = pool->queue_head; 76 pool->queue_head = worker->next; 77 78 pthread_mutex_unlock (&(pool->queue_lock)); //获取一个工作后释放锁 79 80 81 (*(worker->process))(worker->arg); //做工作 82 free(worker); 83 worker = NULL; 84 } 85 86 pthread_exit(NULL); 87 } 88 89 //销毁线程池 90 int 91 pool_destroy() 92 { 93 if(pool->shutdown) //检测结束开关是否开启,若开启,则所有线程会自动退出 94 return -1; 95 pool->shutdown = 1; 96 97 pthread_cond_broadcast( &(pool->queue_ready) ); //广播,唤醒所有线程,准备退出 98 99 int i;100 for(i = 0; i < pool->max_thread_num; ++i)101 pthread_join(pool->threadid[i], NULL); //主线程等待所有线程退出,只有join第一个参数不是指针,第二个参数类型是void **,接收exit的返回值,需要强制转换102 free(pool->threadid);103 104 CThread_worker *head = NULL;105 while(pool->queue_head != NULL) { //释放未执行的工作链表剩余结点106 head = pool->queue_head;107 pool->queue_head = pool->queue_head->next;108 free(head);109 }110 111 pthread_mutex_destroy(&(pool->queue_lock)); //销毁锁和条件变量112 pthread_cond_destroy(&(pool->queue_ready));113 114 free(pool);115 pool=NULL;116 return 0;117 }118 119 void *120 myprocess(void *arg)121 {122 printf("threadid is 0x%x, working on task %d\n", (int)pthread_self(), *(int*)arg);123 sleep (1);124 return NULL;125 }126 127 //添加工作128 int129 pool_add_worker(void *(*process)(void *arg), void *arg)130 {131 CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));132 newworker->process = process; //具体的工作函数133 newworker->arg = arg;134 newworker->next = NULL;135 136 pthread_mutex_lock( &(pool->queue_lock) ); //加锁137 138 CThread_worker *member = pool->queue_head; //插入链表尾部139 if( member != NULL ) {140 while( member->next != NULL )141 member = member->next;142 member->next = newworker;143 }144 else {145 pool->queue_head = newworker;146 }147 ++pool->cur_queue_size;148 149 pthread_mutex_unlock( &(pool->queue_lock) ); //解锁150 151 pthread_cond_signal( &(pool->queue_ready) ); //通知一个等待的线程152 return 0;153 }154 155 int156 main(int argc, char **argv)157 {158 pool_init(3); //主线程创建线程池,3个线程159 160 int *workingnum = (int *) malloc(sizeof(int) * 10);161 int i;162 for(i = 0; i < 10; ++i) {163 workingnum[i] = i;164 pool_add_worker(myprocess, &workingnum[i]); //主线程负责添加工作,10个工作165 }166 167 sleep (5);168 pool_destroy(); //销毁线程池169 free (workingnum);170 171 return 0;172 }