首页 > 代码库 > 简单易用的线程池实现

简单易用的线程池实现

0 前言

最近在写MySQL冷备server的一个模块,稍微接触到一点线程池的东西,自己也就想尝试写一个简单的线程池练练手。

这个线程池在创建时,即按照最大的线程数生成线程。

然后作业任务通过add_task接口往线程池中加入需要运行的任务,再调用线程池的run函数开始运行所有任务,每个线程从任务队列中读取任务,处理完一个任务后再读取新的任务,直到最终任务队列为空。

1 线程池设计

简单描述如下(假设任务类名为CTasklet):

1、CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);

2、创建任务,并把任务加入到线程池

    CTasklet *pTask1 = new CTasklet();

    CTasklet *pTask2 = new CTasklet();

    ...

    thread_pool.add_task(pTask1);

    thread_pool.add_task(pTask2);

    ...

3、调用线程池的run方法开始执行任务

    thread_pool.run();

4、等待任务执行完成

    thread_pool.join_thread();

2 源码

下面给出完整的线程池代码

 1 /* 2  * file: thread_pool.h 3  * desc: 简单的线程池,一次性初始化任务队列和线程池。 4  *  5  */ 6  7 #ifndef _THREAD_POOL_H_ 8 #define _THREAD_POOL_H_ 9 10 #include <pthread.h>11 #include <vector>12 13 using namespace std;14 15 template<typename workType>16 class CThreadPool17 {18     public:19         typedef void * (thread_func)(void *);20 21         CThreadPool(int thread_num, size_t stack_size = 10485760);22         ~CThreadPool();23 24         // 向任务队列中添加任务25         int add_task(workType *pTask);26 27         // 创建新线程并执行28         int run();29 30         // 等待所有的线程执行结束31         int join_thread();32 33     private:34         int init_thread_attr();35         int destroy_thread_attr();36 37         int set_thread_stacksize(size_t stack_size);38         int set_thread_joinable();39 40     protected:41         // 线程池执行函数,必须为static42         static void start_routine(void *para);43 44     private:45         pthread_attr_t attr_;46         static pthread_mutex_t mutex_lock_;47         static list<workType *> list_task_;48 49         int thread_num_; // 最大线程数50         vector<pthread_t> thread_id_vec_;51 };52 #endif
View Code

 

  1 #include "pthread_pool.h"  2   3 template<typename workType>  4 pthread_mutex_t CThreadPool<workType>::mutex_lock_;  5   6 template<typename workType>  7 list<workType*> CThreadPool<workType*>::list_task_;  8   9 template<typename workType> 10 CThreadPool<workType>::CThreadPool(int thread_num, size_t stack_size) 11 { 12     thread_num_ = thread_num; 13     pthread_mutex_init(&mutex_lock_, NULL); 14  15     init_thread_attr(); 16     set_thread_stacksize(stack_size); 17     set_thread_joinable(); 18 } 19  20 template<typename workType> 21 CThreadPool<workType>::~CthreadPool() 22 { 23     destroy_thread_attr(); 24 } 25  26 template <typename workType> 27 int init_thread_attr() 28 { 29     return pthread_attr_init(&m_attr);  30 } 31  32 template <typename workType> 33 int CThreadPool<workType>::destroy_thread_attr() 34 { 35     return pthread_attr_destroy(&attr_); 36 } 37  38 template <typename workType> 39 int CThreadPool<workType>::set_thread_stacksize(size_t stack_size) 40 { 41     return pthread_attr_setstacksize(&attr_, stack_size); 42 } 43  44 template <typename workType> 45 int CThreadPool<workType>::set_thread_joinable() 46 { 47     return pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_JOINABLE); 48 } 49  50 template <typename workType> 51 void CThreadPool<workType>::start_routine(void *para) 52 { 53     workType *pWorkType = NULL; 54  55     while (1) { 56         pthread_mutex_lock(&mutex_lock_); 57  58         if (list_task_.empty()) { 59             pthread_mutex_unlock(mutex_lock_); 60             return; 61         } 62  63         pWorkType = *(list_task_.begin()); 64         list_task_.pop_front(); 65         pthread_mutex_unlock(&mutex_lock_); 66  67         pWorkType->run(); 68         delete pWorkType; 69         pWorkType = NULL; 70     } 71 } 72  73 template <typename workType> 74 int CThreadPool<workType>::add_task(workType *pTask) 75 { 76     pthread_mutex_lock(&mutex_lock_); 77     list_task_.push_back(pTask); 78     pthread_mutex_unlock(&mutex_lock_); 79     return 0; 80 } 81  82 template <typename workType> 83 int CThreadWork<workType>::run() 84 { 85     int rc; 86     pthread_t tid; 87     for (int i = 0; i < thread_num_; ++i) { 88         rc = pthread_create(&tid, &attr_, (thread_func)start_routine, NULL); 89         thread_id_vec_.push_back(tid); 90     } 91     return rc; 92 } 93  94 template <typename workType> 95 int CThreadWork<workType>::join_thread() 96 { 97     int rc = 0; 98     vector<pthread_t>::iterator iter; 99     for (iter = thread_id_vec_.begin(); iter != thread_id_vec_.end(); ++iter) {100         rc = pthread_join((*iter), NULL);101     }102     thread_id_vec_.clear();103     return rc;104 }
View Code

 

测试代码如下:

 1 #include <unistd.h> 2  3 #include <iostream> 4 #include <list> 5  6 using namespace std; 7  8 class CTasklet 9 {10     public:11         CTasklet(int num) {12             num_ = num;13             cout << "CTasklet ctor create num: " << num_ << endl;14         }15 16         ~CTasklet() {17             cout << "CTasklet dtor delete num: " << num_ << endl;18         }19 20         int run() {21             cout << "CTasklet sleep begin: " << num_ << endl;22             sleep(num_);23             cout << "CTasklet sleep end: " << num_ << endl;24         }25 26     private:27         int num_;28 };29 30 #define MAX_THREAD_NUM 331 32 33 void add_task()34 {35     for (int i = 0; i < 6; ++i) {36         CTasklet *pTask = new CTasklet(i);37         thread_pool.add_task(pTask);38     }39 }40 41 int main(int argc, char **argv)42 {43     // Step1. 创建线程池44     CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);45 46     // Step2. 创建任务,并加入到线程池中47     add_task();48     // Step3. 开始执行任务49     thread_pool.run();50     // Step4. 等待任务结束51     thread_pool.join_thread();52 53     return 0;54 55 }
View Code

3 总结

上面的线程池属于最简单的一类线程池,即相当于程序运行时候就开启n个线程来执行任务。真正的线程池需要考虑的方面比较多,比如1、线程池中的线程数应该能动态变化;2、线程池能动态调度线程来运行任务,以达到均衡;3、线程池还应该能记录任务的运行时间,防止超时等等。

不过,起码我们已经开了个头,实现了一个简单的线程池,接下来,让我们在这个基础上一步步调整、完善。

PS:对于线程池的考虑,我能想到的有动态增减线程数、超时机制、负载均衡。不知道大家理解线程池还需要考虑什么场景。

简单易用的线程池实现