首页 > 代码库 > Linux组件封装(七)——线程池的简单封装

Linux组件封装(七)——线程池的简单封装

线程池的封装,基础思想与生产者消费者的封装一样,只不过我们是将线程池封装为自动获取任务、执行任务,让用户调用相应的接口来添加任务。

在线程池的封装中,我们同样需要用到的是MutexLock、Condition、Thread这些基本的封装。

基础封装如下:

MutexLock:

 1 #ifndef MUTEXLOCK_H 2 #define MUTEXLOCK_H 3  4 #include "NonCopyable.h" 5 #include <pthread.h> 6 #include <stdlib.h> 7 #include <stdio.h> 8 #define TINY_CHECK(exp) 9         if(!exp)10         {    11             fprintf(stderr, "File : %s, Line : %d Exp : ["#exp"] is true, abort.\n", __FILE__, __LINE__); abort();12         }13 14 15 16 class MutexLock :  NonCopyable17 {18     friend class Condition;19 public:20     MutexLock();21     ~MutexLock();22     void lock();23     void unlock();24 25     bool isLocked() const { return _isLock; }26     pthread_mutex_t *getMutexPtr() { return &_mutex; }27 28 private:29     void restoreMutexStatus()30     { _isLock = true; }31 32     pthread_mutex_t _mutex;33     bool _isLock;34 };35 36 37 class MutexLockGuard : NonCopyable            //将锁封装到MutexLockGuard中,38 {                                            //这样只需定义一个对象,便可39 public:                                        //便可自动上锁,对象销毁时自动解锁40     MutexLockGuard(MutexLock &mutex)41         :_mutex(mutex)42     { _mutex.lock(); }43 44     ~MutexLockGuard()45     { _mutex.unlock(); }46 47 private:48     MutexLock &_mutex;49 };50 #define MutexLockGuard(m) "ERROR"51 52 #endif
View Code
 1 #include "MutexLock.h" 2 #include <assert.h> 3  4 MutexLock::MutexLock() 5     :_isLock(false) 6 { 7     TINY_CHECK(!pthread_mutex_init(&_mutex, NULL)); 8 } 9 10 MutexLock::~MutexLock()11 {12     assert(!isLocked());13     TINY_CHECK(!pthread_mutex_destroy(&_mutex));14 }15 16 void MutexLock::lock()17 {18     TINY_CHECK(!pthread_mutex_lock(&_mutex));19     _isLock = true;20 }21 22 void MutexLock::unlock()23 {24     _isLock = false;25     TINY_CHECK(!pthread_mutex_unlock(&_mutex));26 }
View Code

Condition:

 1 #ifndef CONDITION_H 2 #define CONDITION_H 3  4 #include <pthread.h> 5 #include "NonCopyable.h" 6  7 class MutexLock; 8  9 10 class Condition : NonCopyable11 {12 public:13     Condition(MutexLock &mutex);14     ~Condition();15 16     void wait();17     void notify();18     void notifyAll();19 private:20     pthread_cond_t _cond;21     MutexLock &_mutex;22 };23 24 #endif
View Code
 1 #include "Condition.h" 2 #include "MutexLock.h" 3 #include <assert.h> 4  5 Condition::Condition(MutexLock &mutex) 6     :_mutex(mutex) 7 { 8     TINY_CHECK(!pthread_cond_init(&_cond, NULL)); 9 }10 11 Condition::~Condition()12 {13     TINY_CHECK(!pthread_cond_destroy(&_cond));14 }15 16 void Condition::wait()17 {18     assert(_mutex.isLocked());19     TINY_CHECK(!pthread_cond_wait(&_cond, _mutex.getMutexPtr()));20     _mutex.restoreMutexStatus();21 }22 23 void Condition::notify()24 {25     TINY_CHECK(!pthread_cond_signal(&_cond));26 }27 28 void Condition::notifyAll()29 {30     TINY_CHECK(!pthread_cond_broadcast(&_cond));31 }
View Code

Thread:

 1 #ifndef THREAD_H 2 #define THREAD_H 3 #include <boost/noncopyable.hpp> 4 #include <functional> 5 #include <pthread.h> 6 class Thread : boost::noncopyable 7 { 8 public: 9 10     typedef std::function<void()> ThreadCallback;11 12     Thread(ThreadCallback cb);13     ~Thread();14 15     void start();16     void join();17 18 19 20 21 private:22 23     static void *runInThread(void *);24     pthread_t _threadId;25     bool _isRun;26     ThreadCallback _callback;27 };28 29 30 #endif  /*THREAD_H*/
View Code
 1 #include "Thread.h" 2 #include <assert.h> 3  4 Thread::Thread(ThreadCallback cb) 5     :_threadId(0), 6      _isRun(false), 7      _callback(cb) 8 { 9 10 }11 12 Thread::~Thread()13 {14     if(_isRun)15         pthread_detach(_threadId);16 }17 18 19 void Thread::start()20 {21     pthread_create(&_threadId, NULL, runInThread, this);22     _isRun = true;23 }24 25 void Thread::join()26 {27     assert(_isRun);28     pthread_join(_threadId, NULL);29     _isRun = false;30 }31 32 void *Thread::runInThread(void *arg)33 {34     Thread *p = static_cast<Thread *>(arg);35     p->_callback();36     return NULL;37 }
View Code

NonCopyable:

 1 #ifndef NONCOPYABLE_H 2 #define NONCOPYABLE_H 3  4 class NonCopyable             //禁用值语意 5 { 6     public: 7         NonCopyable() { } 8         ~NonCopyable() { } 9     private:10         NonCopyable(const NonCopyable &);11         void operator= (const NonCopyable &);12 };13 14 #endif  /*NON_COPYABLE_H*/
View Code

 

在线程池的封装中,我们需要的数据结构有一个互斥锁,两个条件变量,任务队列以及线程池的队列。

然后,我们需要提供给用户添加任务的接口addTask,在线程池中,我们需要相应的获取任务函数getTask,执行任务的函数runInThread。

头文件代码如下:

 1 #ifndef THREAD_POOL_H 2 #define THREAD_POOL_H 3  4 #include <boost/noncopyable.hpp> 5 #include "MutexLock.h" 6 #include "Condition.h" 7 #include <queue> 8 #include <memory> 9 #include <functional>10 11 class Thread;12 13 class ThreadPool : boost::noncopyable14 {15 public:16     typedef std::function<void()> Task;17 18     ThreadPool(size_t queueSize, size_t poolSize);19     ~ThreadPool();20 21     void start();22     void stop();23 24     void addTask(Task task);25     bool isRunning() const26     { return _isStart; }27 28 private:29     Task getTask();30     void runInThread();31 32     mutable MutexLock _mutex;33     Condition _full;34     Condition _empty;35     size_t _queueSize;36     std::queue<Task> _queue;37     const size_t _poolSize;38     std::vector<std::unique_ptr<Thread> > _threads;39     bool _isStart;40 };41 42 43 #endif  /*THREAD_POOL_H*/
View Code

在构造函数中,我们用一把锁去初始化两个条件变量,用相应的长度来初始化任务队列的长度与线程池中线程的个数:

ThreadPool::ThreadPool(size_t queueSize, size_t poolSize)    :_full(_mutex),     _empty(_mutex),     _queueSize(queueSize),     _poolSize(poolSize),     _isStart(false){}

addTask函数中,我们首先要判断线程池是否开启,然后加锁,判断任务队列是否已满,进行等待。等待后将相应的任务加入到任务队列,通知getTask来获取任务:

void ThreadPool::addTask(Task task){    if(!_isStart)        return;    MutexLockGuard lock(_mutex);    while(_queue.size() >= _queueSize)        _empty.wait();    _queue.push(std::move(task));    _full.notify();}

getTask函数中,我们首先判断线程池是否开启,然后对应加锁,判断任务队列是否为空,进行等待。等待后,取出任务队列中的第一个任务,通知addTask可以继续添加任务:

ThreadPool::Task ThreadPool::getTask(){    if(!_isStart)        return Task();        MutexLockGuard lock(_mutex);    while(_queue.empty() && _isStart)        _full.wait();    assert(!_queue.empty());    Task task = _queue.front();    _queue.pop();    _empty.notify();    return task;}

runInThread函数相对简单,只是获取相应任务,执行该任务即可:

void ThreadPool::runInThread(){    while(_isStart)    {        Task task(getTask());        if(task)            task();    }}

start函数,相对来说就比较复杂,我们首先需要new出线程,将线程添加到线程队列,然后将线程队列中的线程开启。

void ThreadPool::start(){    if(_isStart)        return ;    _isStart = true;    for(size_t i = 0; i != _poolSize; ++ i)        _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this))));    for(size_t i = 0; i != _poolSize; ++ i)        _threads[i]->start();}

这里需要注意:由于线程不可复制和赋值,我们将Thread相应的unique_ptr添加到线程队列,才可以达到相应的效果。而每个线程的回调函数,我们运用C++11的特性——function和bind来实现,我们将Thread的回调函数设置为一个function模板,通过bind,将ThreaPool::runInThread函数中的一个隐式参数转化为无参数,即将this指针绑定给ThreaPool::runInThread,这样,ThreaPool::runInThread就不再需要this指针了。

stop函数,我们需要考虑的因素较多,由于线程池已经开启,但不是每个线程都在运行,有些还在沉睡中,所以我们需要通过notifyAll来通知所有的线程获取任务,这样线程池中的线程已经唤醒,然后我们对每个线程进行join,而任务队列中未执行的任务,我们需要将其清空,通过pop函数来弹出任务队列中的任务:

void ThreadPool::stop(){    if(!_isStart)        return ;    {        MutexLockGuard lock(_mutex);        _isStart = false;        _full.notifyAll();    }    for(size_t i = 0; i != _poolSize; ++ i)        _threads[i]->join();    while(!_queue.empty())        _queue.pop();    _threads.clear();}

 

 

具体实现代码如下:

 1 #include "Thread.h" 2 #include "ThreadPool.h" 3 #include <assert.h> 4 using namespace std; 5  6 ThreadPool::ThreadPool(size_t queueSize, size_t poolSize) 7     :_full(_mutex), 8      _empty(_mutex), 9      _queueSize(queueSize),10      _poolSize(poolSize),11      _isStart(false)12 {13 14 }15 ThreadPool::~ThreadPool()16 {17     if(_isStart)18         stop();19 }20 21 void ThreadPool::addTask(Task task)22 {23     if(!_isStart)24         return;25     MutexLockGuard lock(_mutex);26     while(_queue.size() >= _queueSize)27         _empty.wait();28 29     _queue.push(std::move(task));30     _full.notify();31 }32 33 ThreadPool::Task ThreadPool::getTask()34 {35     MutexLockGuard lock(_mutex);36     while(_queue.empty() && _isStart)37         _full.wait();38 39     if(!_isStart)40         return Task();41 42     assert(!_queue.empty());43     Task task = _queue.front();44     _queue.pop();45     _empty.notify();46 47     return task;48 }49 50 void ThreadPool::runInThread()51 {52     while(_isStart)53     {54         Task task(getTask());55         if(task)56             task();57     }58 }59 void ThreadPool::start()60 {61     if(_isStart)62         return ;63     _isStart = true;64 65     for(size_t i = 0; i != _poolSize; ++ i)66         _threads.push_back(std::unique_ptr<Thread>(new Thread(std::bind(&ThreadPool::runInThread, this))));67 68     for(size_t i = 0; i != _poolSize; ++ i)69         _threads[i]->start();70 }71 72 void ThreadPool::stop()73 {74     if(!_isStart)75         return ;76     {77         MutexLockGuard lock(_mutex);78         _isStart = false;79         _full.notifyAll();80     }81 82     for(size_t i = 0; i != _poolSize; ++ i)83         _threads[i]->join();84 85     while(!_queue.empty())86         _queue.pop();87 88     _threads.clear();89 }
View Code

 

 

在测试中,我们可以通过前面的定时器来测试,生成一个定时器线程,定时器到期时,对线程池执行stop。

测试代码如下:

#include "ThreadPool.h"#include "nano_sleep.hpp"#include <iostream>#include <time.h>#include "TimerThread.h"using namespace std;void foo(){    cout << rand() % 100 << endl;}void stopPool(ThreadPool *a){    a->stop();}void nano_sleep(double val);int main(int argc, const char *argv[]){    ThreadPool b(120, 4);    TimerThread a(4, 0, std::bind(&stopPool, &b));    b.start();    a.start();    while(b.isRunning())    {        b.addTask(&foo);        nano_sleep(0.5);    }    a.stop();    return 0;}void nano_sleep(double val){    struct timespec tv;    tv.tv_sec = val; //取整    tv.tv_nsec = (val - tv.tv_sec) * 1000 * 1000 * 1000;    int ret;    do    {        ret = nanosleep(&tv, &tv);    }while(ret == -1 && errno == EINTR);}

在测试代码中,nano_sleep是一个相对精确地睡眠函数,我们可以将睡眠的精度限制到double。

当定时器到时后,会显示timeout,程序会自动退出。

注意:由于运用了C++11的一些特性,如function、bind和右值引用等,编译时需加上-std=c++0x

Linux组件封装(七)——线程池的简单封装