首页 > 代码库 > Linux组件封装(七)——线程池的简单封装
Linux组件封装(七)——线程池的简单封装
线程池的封装,基础思想与生产者消费者的封装一样,只不过我们是将线程池封装为自动获取任务、执行任务,让用户调用相应的接口来添加任务。
在线程池的封装中,我们同样需要用到的是MutexLock、Condition、Thread这些基本的封装。
基础封装如下:
MutexLock:
1 #ifndef MUTEXLOCK_H 2 #define MUTEXLOCK_H 3 4 #include "NonCopyable.h" 5 #include <pthread.h> 6 #include <stdlib.h> 7 #include <stdio.h> 8 #define TINY_CHECK(exp) 9 if(!exp)10 { 11 fprintf(stderr, "File : %s, Line : %d Exp : ["#exp"] is true, abort.\n", __FILE__, __LINE__); abort();12 }13 14 15 16 class MutexLock : NonCopyable17 {18 friend class Condition;19 public:20 MutexLock();21 ~MutexLock();22 void lock();23 void unlock();24 25 bool isLocked() const { return _isLock; }26 pthread_mutex_t *getMutexPtr() { return &_mutex; }27 28 private:29 void restoreMutexStatus()30 { _isLock = true; }31 32 pthread_mutex_t _mutex;33 bool _isLock;34 };35 36 37 class MutexLockGuard : NonCopyable //将锁封装到MutexLockGuard中,38 { //这样只需定义一个对象,便可39 public: //便可自动上锁,对象销毁时自动解锁40 MutexLockGuard(MutexLock &mutex)41 :_mutex(mutex)42 { _mutex.lock(); }43 44 ~MutexLockGuard()45 { _mutex.unlock(); }46 47 private:48 MutexLock &_mutex;49 };50 #define MutexLockGuard(m) "ERROR"51 52 #endif
1 #include "MutexLock.h" 2 #include <assert.h> 3 4 MutexLock::MutexLock() 5 :_isLock(false) 6 { 7 TINY_CHECK(!pthread_mutex_init(&_mutex, NULL)); 8 } 9 10 MutexLock::~MutexLock()11 {12 assert(!isLocked());13 TINY_CHECK(!pthread_mutex_destroy(&_mutex));14 }15 16 void MutexLock::lock()17 {18 TINY_CHECK(!pthread_mutex_lock(&_mutex));19 _isLock = true;20 }21 22 void MutexLock::unlock()23 {24 _isLock = false;25 TINY_CHECK(!pthread_mutex_unlock(&_mutex));26 }
Condition:
1 #ifndef CONDITION_H 2 #define CONDITION_H 3 4 #include <pthread.h> 5 #include "NonCopyable.h" 6 7 class MutexLock; 8 9 10 class Condition : NonCopyable11 {12 public:13 Condition(MutexLock &mutex);14 ~Condition();15 16 void wait();17 void notify();18 void notifyAll();19 private:20 pthread_cond_t _cond;21 MutexLock &_mutex;22 };23 24 #endif
1 #include "Condition.h" 2 #include "MutexLock.h" 3 #include <assert.h> 4 5 Condition::Condition(MutexLock &mutex) 6 :_mutex(mutex) 7 { 8 TINY_CHECK(!pthread_cond_init(&_cond, NULL)); 9 }10 11 Condition::~Condition()12 {13 TINY_CHECK(!pthread_cond_destroy(&_cond));14 }15 16 void Condition::wait()17 {18 assert(_mutex.isLocked());19 TINY_CHECK(!pthread_cond_wait(&_cond, _mutex.getMutexPtr()));20 _mutex.restoreMutexStatus();21 }22 23 void Condition::notify()24 {25 TINY_CHECK(!pthread_cond_signal(&_cond));26 }27 28 void Condition::notifyAll()29 {30 TINY_CHECK(!pthread_cond_broadcast(&_cond));31 }
Thread:
1 #ifndef THREAD_H 2 #define THREAD_H 3 #include <boost/noncopyable.hpp> 4 #include <functional> 5 #include <pthread.h> 6 class Thread : boost::noncopyable 7 { 8 public: 9 10 typedef std::function<void()> ThreadCallback;11 12 Thread(ThreadCallback cb);13 ~Thread();14 15 void start();16 void join();17 18 19 20 21 private:22 23 static void *runInThread(void *);24 pthread_t _threadId;25 bool _isRun;26 ThreadCallback _callback;27 };28 29 30 #endif /*THREAD_H*/
1 #include "Thread.h" 2 #include <assert.h> 3 4 Thread::Thread(ThreadCallback cb) 5 :_threadId(0), 6 _isRun(false), 7 _callback(cb) 8 { 9 10 }11 12 Thread::~Thread()13 {14 if(_isRun)15 pthread_detach(_threadId);16 }17 18 19 void Thread::start()20 {21 pthread_create(&_threadId, NULL, runInThread, this);22 _isRun = true;23 }24 25 void Thread::join()26 {27 assert(_isRun);28 pthread_join(_threadId, NULL);29 _isRun = false;30 }31 32 void *Thread::runInThread(void *arg)33 {34 Thread *p = static_cast<Thread *>(arg);35 p->_callback();36 return NULL;37 }
NonCopyable:
1 #ifndef NONCOPYABLE_H 2 #define NONCOPYABLE_H 3 4 class NonCopyable //禁用值语意 5 { 6 public: 7 NonCopyable() { } 8 ~NonCopyable() { } 9 private:10 NonCopyable(const NonCopyable &);11 void operator= (const NonCopyable &);12 };13 14 #endif /*NON_COPYABLE_H*/
在线程池的封装中,我们需要的数据结构有一个互斥锁,两个条件变量,任务队列以及线程池的队列。
然后,我们需要提供给用户添加任务的接口addTask,在线程池中,我们需要相应的获取任务函数getTask,执行任务的函数runInThread。
头文件代码如下:
1 #ifndef THREAD_POOL_H 2 #define THREAD_POOL_H 3 4 #include <boost/noncopyable.hpp> 5 #include "MutexLock.h" 6 #include "Condition.h" 7 #include <queue> 8 #include <memory> 9 #include <functional>10 11 class Thread;12 13 class ThreadPool : boost::noncopyable14 {15 public:16 typedef std::function<void()> Task;17 18 ThreadPool(size_t queueSize, size_t poolSize);19 ~ThreadPool();20 21 void start();22 void stop();23 24 void addTask(Task task);25 bool isRunning() const26 { return _isStart; }27 28 private:29 Task getTask();30 void runInThread();31 32 mutable MutexLock _mutex;33 Condition _full;34 Condition _empty;35 size_t _queueSize;36 std::queue<Task> _queue;37 const size_t _poolSize;38 std::vector<std::unique_ptr<Thread> > _threads;39 bool _isStart;40 };41 42 43 #endif /*THREAD_POOL_H*/
在构造函数中,我们用一把锁去初始化两个条件变量,用相应的长度来初始化任务队列的长度与线程池中线程的个数:
ThreadPool::ThreadPool(size_t queueSize, size_t poolSize) :_full(_mutex), _empty(_mutex), _queueSize(queueSize), _poolSize(poolSize), _isStart(false){}
addTask函数中,我们首先要判断线程池是否开启,然后加锁,判断任务队列是否已满,进行等待。等待后将相应的任务加入到任务队列,通知getTask来获取任务:
void ThreadPool::addTask(Task task){ if(!_isStart) return; MutexLockGuard lock(_mutex); while(_queue.size() >= _queueSize) _empty.wait(); _queue.push(std::move(task)); _full.notify();}
getTask函数中,我们首先判断线程池是否开启,然后对应加锁,判断任务队列是否为空,进行等待。等待后,取出任务队列中的第一个任务,通知addTask可以继续添加任务:
ThreadPool::Task ThreadPool::getTask(){ if(!_isStart) return Task(); MutexLockGuard lock(_mutex); while(_queue.empty() && _isStart) _full.wait(); assert(!_queue.empty()); Task task = _queue.front(); _queue.pop(); _empty.notify(); return task;}
runInThread函数相对简单,只是获取相应任务,执行该任务即可:
void ThreadPool::runInThread(){ while(_isStart) { Task task(getTask()); if(task) task(); }}
start函数,相对来说就比较复杂,我们首先需要new出线程,将线程添加到线程队列,然后将线程队列中的线程开启。
void ThreadPool::start(){ if(_isStart) return ; _isStart = true; for(size_t i = 0; i != _poolSize; ++ i) _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this)))); for(size_t i = 0; i != _poolSize; ++ i) _threads[i]->start();}
这里需要注意:由于线程不可复制和赋值,我们将Thread相应的unique_ptr添加到线程队列,才可以达到相应的效果。而每个线程的回调函数,我们运用C++11的特性——function和bind来实现,我们将Thread的回调函数设置为一个function模板,通过bind,将ThreaPool::runInThread函数中的一个隐式参数转化为无参数,即将this指针绑定给ThreaPool::runInThread,这样,ThreaPool::runInThread就不再需要this指针了。
stop函数,我们需要考虑的因素较多,由于线程池已经开启,但不是每个线程都在运行,有些还在沉睡中,所以我们需要通过notifyAll来通知所有的线程获取任务,这样线程池中的线程已经唤醒,然后我们对每个线程进行join,而任务队列中未执行的任务,我们需要将其清空,通过pop函数来弹出任务队列中的任务:
void ThreadPool::stop(){ if(!_isStart) return ; { MutexLockGuard lock(_mutex); _isStart = false; _full.notifyAll(); } for(size_t i = 0; i != _poolSize; ++ i) _threads[i]->join(); while(!_queue.empty()) _queue.pop(); _threads.clear();}
具体实现代码如下:
1 #include "Thread.h" 2 #include "ThreadPool.h" 3 #include <assert.h> 4 using namespace std; 5 6 ThreadPool::ThreadPool(size_t queueSize, size_t poolSize) 7 :_full(_mutex), 8 _empty(_mutex), 9 _queueSize(queueSize),10 _poolSize(poolSize),11 _isStart(false)12 {13 14 }15 ThreadPool::~ThreadPool()16 {17 if(_isStart)18 stop();19 }20 21 void ThreadPool::addTask(Task task)22 {23 if(!_isStart)24 return;25 MutexLockGuard lock(_mutex);26 while(_queue.size() >= _queueSize)27 _empty.wait();28 29 _queue.push(std::move(task));30 _full.notify();31 }32 33 ThreadPool::Task ThreadPool::getTask()34 {35 MutexLockGuard lock(_mutex);36 while(_queue.empty() && _isStart)37 _full.wait();38 39 if(!_isStart)40 return Task();41 42 assert(!_queue.empty());43 Task task = _queue.front();44 _queue.pop();45 _empty.notify();46 47 return task;48 }49 50 void ThreadPool::runInThread()51 {52 while(_isStart)53 {54 Task task(getTask());55 if(task)56 task();57 }58 }59 void ThreadPool::start()60 {61 if(_isStart)62 return ;63 _isStart = true;64 65 for(size_t i = 0; i != _poolSize; ++ i)66 _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this))));67 68 for(size_t i = 0; i != _poolSize; ++ i)69 _threads[i]->start();70 }71 72 void ThreadPool::stop()73 {74 if(!_isStart)75 return ;76 {77 MutexLockGuard lock(_mutex);78 _isStart = false;79 _full.notifyAll();80 }81 82 for(size_t i = 0; i != _poolSize; ++ i)83 _threads[i]->join();84 85 while(!_queue.empty())86 _queue.pop();87 88 _threads.clear();89 }
在测试中,我们可以通过前面的定时器来测试,生成一个定时器线程,定时器到期时,对线程池执行stop。
测试代码如下:
#include "ThreadPool.h"#include "nano_sleep.hpp"#include <iostream>#include <time.h>#include "TimerThread.h"using namespace std;void foo(){ cout << rand() % 100 << endl;}void stopPool(ThreadPool *a){ a->stop();}void nano_sleep(double val);int main(int argc, const char *argv[]){ ThreadPool b(120, 4); TimerThread a(4, 0, std::bind(&stopPool, &b)); b.start(); a.start(); while(b.isRunning()) { b.addTask(&foo); nano_sleep(0.5); } a.stop(); return 0;}void nano_sleep(double val){ struct timespec tv; tv.tv_sec = val; //取整 tv.tv_nsec = (val - tv.tv_sec) * 1000 * 1000 * 1000; int ret; do { ret = nanosleep(&tv, &tv); }while(ret == -1 && errno == EINTR);}
在测试代码中,nano_sleep是一个相对精确地睡眠函数,我们可以将睡眠的精度限制到double。
当定时器到时后,会显示timeout,程序会自动退出。
注意:由于运用了C++11的一些特性,如function、bind和右值引用等,编译时需加上-std=c++0x
Linux组件封装(七)——线程池的简单封装