首页 > 代码库 > 线程池的简单实现

线程池的简单实现

几个基本的线程函数:

//线程操纵函数
//创建:         int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);
//终止自身        void pthread_exit(void *retval);
//终止其他:       int pthread_cancel(pthread_t tid);    发送终止信号后目标线程不一定终止,要调用join函数等待
//阻塞并等待其他线程:int pthread_join(pthread_t tid, void **retval);

//属性
//初始化:   int pthread_attr_init(pthread_attr_t *attr);
//设置分离状态:   pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
//销毁属性:     int pthread_attr_destroy(pthread_attr_t *attr);

//同步函数
//互斥锁
//初始化:    int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
//销毁锁:    int pthread_mutex_destroy(pthread_mutex_t *mutex);
//加锁:        pthread_mutex_lock(pthread_mutex_t *mutex);
//尝试加锁: int pthread_mutex_trylock(pthread_mutex_t *mutex);
//解锁:        int pthread_mutex_unlock(pthread_mutex_t *mutex);

//条件变量
//初始化:int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr);
//销毁:   int pthread_cond_destroy(pthread_cond_t *cond);
//等待条件: pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
//通知:        pthread_cond_signal(pthread_cond_t *cond): 唤醒第一个调用pthread_cond_wait()而进入睡眠的线程

//工具函数
//比较线程ID: int pthread_equal(pthread_t t1, pthread_t t2);
//分离线程:     pthread_detach(pthread_t tid);
//自身ID:      pthread_t pthread_self(void);

 

  1 #include <stdio.h>  2 #include <stdlib.h>  3 #include <pthread.h>    //linux环境中多线程的头文件,非C语言标准库,编译时最后要加 -lpthread 调用动态链接库  4   5 //工作链表的结构  6 typedef struct worker {  7     void *(*process) (void *arg);   //工作函数  8     void *arg;                      //函数的参数  9     struct worker *next; 10 }CThread_worker; 11  12 //线程池的结构 13 typedef struct { 14     pthread_mutex_t queue_lock;     //互斥锁 15     pthread_cond_t  queue_ready;    //条件变量/信号量 16  17     CThread_worker *queue_head;     //指向工作链表的头结点,临界区 18     int cur_queue_size;             //记录链表中工作的数量,临界区 19  20     int max_thread_num;             //最大线程数 21     pthread_t *threadid;            //线程ID 22  23     int shutdown;                   //开关 24 }CThread_pool; 25  26 static CThread_pool *pool = NULL;   //一个线程池变量 27 int pool_add_worker(void *(*process)(void *arg), void *arg);    //负责向工作链表中添加工作 28 void *thread_routine(void *arg);    //线程例程 29  30 //线程池初始化 31 void 32 pool_init(int max_thread_num) 33 { 34     int i = 0; 35  36     pool = (CThread_pool *) malloc (sizeof (CThread_pool));    //创建线程池 37  38     pthread_mutex_init(&(pool->queue_lock),  NULL);     //互斥锁初始化,参数为锁的地址 39     pthread_cond_init( &(pool->queue_ready), NULL);     //条件变量初始化,参数为变量地址 40  41     pool->queue_head = NULL; 42     pool->cur_queue_size = 0; 43  44     pool->max_thread_num = max_thread_num; 45     pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t)); 46     for (i = 0; i < max_thread_num; i++) { 47         pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);  //创建线程, 参数为线程ID变量地址、属性、例程、参数 48     } 49  50     pool->shutdown = 0; 51 } 52  53 //例程,调用具体的工作函数 54 void * 55 thread_routine(void *arg) 56 { 57     printf("starting thread 0x%x\n", (int)pthread_self()); 58     while(1) { 59         pthread_mutex_lock(&(pool->queue_lock));    //从工作链表中取工作,要先加互斥锁,参数为锁地址 60  61         while(pool->cur_queue_size == 0 && !pool->shutdown) {       //链表为空 62             printf("thread 0x%x is waiting\n", (int)pthread_self()); 63             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));   //等待资源,信号量用于通知。会释放第二个参数的锁,以供添加;函数返回时重新加锁。 64         } 65  66         if(pool->shutdown) { 67             pthread_mutex_unlock(&(pool->queue_lock));          //结束开关开启,释放锁并退出线程 68             printf("thread 0x%x will exit\n", (int)pthread_self()); 69             pthread_exit(NULL);     //参数为void * 70         } 71  72         printf("thread 0x%x is starting to work\n", (int)pthread_self()); 73  74         --pool->cur_queue_size; 75         CThread_worker *worker = pool->queue_head; 76         pool->queue_head = worker->next; 77  78         pthread_mutex_unlock (&(pool->queue_lock));     //获取一个工作后释放锁 79  80  81         (*(worker->process))(worker->arg);      //做工作 82         free(worker); 83         worker = NULL; 84     } 85  86     pthread_exit(NULL); 87 } 88  89 //销毁线程池 90 int 91 pool_destroy() 92 { 93     if(pool->shutdown)      //检测结束开关是否开启,若开启,则所有线程会自动退出 94         return -1; 95     pool->shutdown = 1; 96  97     pthread_cond_broadcast( &(pool->queue_ready) );     //广播,唤醒所有线程,准备退出 98  99     int i;100     for(i = 0; i < pool->max_thread_num; ++i)101         pthread_join(pool->threadid[i], NULL);      //主线程等待所有线程退出,只有join第一个参数不是指针,第二个参数类型是void **,接收exit的返回值,需要强制转换102     free(pool->threadid);103 104     CThread_worker *head = NULL;105     while(pool->queue_head != NULL) {           //释放未执行的工作链表剩余结点106         head = pool->queue_head;107         pool->queue_head = pool->queue_head->next;108         free(head);109     }110 111     pthread_mutex_destroy(&(pool->queue_lock));     //销毁锁和条件变量112     pthread_cond_destroy(&(pool->queue_ready));113 114     free(pool);115     pool=NULL;116     return 0;117 }118 119 void *120 myprocess(void *arg)121 {122     printf("threadid is 0x%x, working on task %d\n", (int)pthread_self(), *(int*)arg);123     sleep (1);124     return NULL;125 }126 127 //添加工作128 int129 pool_add_worker(void *(*process)(void *arg), void *arg)130 {131     CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));132     newworker->process = process;   //具体的工作函数133     newworker->arg = arg;134     newworker->next = NULL;135 136     pthread_mutex_lock( &(pool->queue_lock) );      //加锁137 138     CThread_worker *member = pool->queue_head;      //插入链表尾部139     if( member != NULL ) {140         while( member->next != NULL )141             member = member->next;142         member->next = newworker;143     }144     else {145         pool->queue_head = newworker;146     }147     ++pool->cur_queue_size;148 149     pthread_mutex_unlock( &(pool->queue_lock) );   //解锁150 151     pthread_cond_signal( &(pool->queue_ready) );   //通知一个等待的线程152     return 0;153 }154 155 int156 main(int argc, char **argv)157 {158     pool_init(3);   //主线程创建线程池,3个线程159 160     int *workingnum = (int *) malloc(sizeof(int) * 10);161     int i;162     for(i = 0; i < 10; ++i) {163         workingnum[i] = i;164         pool_add_worker(myprocess, &workingnum[i]);     //主线程负责添加工作,10个工作165     }166 167     sleep (5);168     pool_destroy();     //销毁线程池169     free (workingnum);170 171     return 0;172 }