首页 > 代码库 > C++线程池2013年的博客迁移

C++线程池2013年的博客迁移

技术分享


线程基类头文件:


#ifndef YTHREAD_H
#define YTHREAD_H

#include <iostream>
using namespace std;
#define  CERR(str) cout<<"error:"<< str<<std::endl
#define  CINFO(str) cout<<"info:"<< str<<std::endl
enum ResCode
{
    RES_OK = 0,
    RES_ERR = -1,
};

class TAutoLock
{
public:
    TAutoLock(pthread_mutex_t & lock) : m_lock(&lock) { pthread_mutex_lock(m_lock); }
    ~TAutoLock(void) { release(); }

    void release(void)
    {
        if (m_lock)
        {
            pthread_mutex_unlock(m_lock);
            m_lock = NULL;
        }
    }
protected:
    mutable pthread_mutex_t	*	m_lock;
};

class YThreadState
{
public:
    enum typeState
    {
        TS_NONE,			
        TS_BUILD,			
        TS_WAITTING,	
        TS_RUNNING,			
        TS_TOEND,			
        TS_ENDING,			
        TS_DEF_END
    };

    //! return the state information
    static const char * getStateString(typeState nState);

    //! Constructor
    YThreadState(void) : m_nState(TS_NONE),m_nResult(RES_OK)
    {
        pthread_mutex_init(&m_LockState, NULL);
         pthread_cond_init(&m_ConditionState,NULL);
    }

    virtual ~YThreadState(void)
    {
        pthread_mutex_destroy(&m_LockState);
        pthread_cond_destroy(&m_ConditionState);
    //! Destructor
    }


    //! return the current status
    typeState		getState(void) const { return m_nState; }

protected:
    //! sub-class may decide whether to exit in function Run();
    bool			mustExit(void) const { return m_nState == TS_TOEND; }
    //! reset the object‘s state
    void			setState(typeState state) { m_nState = state; }

    volatile typeState	m_nState;

    int			m_nResult;

    pthread_mutex_t						m_LockState;
    pthread_cond_t					m_ConditionState;
};



class YThreadBase
{
public:
    YThreadBase(void);
    virtual ~YThreadBase(void) {}

    enum	THREAD_PRIORITY
    {
        PRIORITY_HIGHEST		=	7,
        PRIORITY_ABOVE_NORMAL	=	1,
        PRIORITY_NORMAL			=	0,
        PRIORITY_BELOW_NORMAL	=	-1,
        PRIORITY_LOWEST			=	-7,
    };
protected:

    #define ThreadFuncReturnCode void*
    typedef void * (* ThreadFunc)(void * arg);
    typedef pthread_t		typeThreaID;

    typeThreaID	m_nPthreadID;
    static int	threadExit(int nValue);
    static int threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPrio,typeThreaID * pThreadID,bool boDetach);
     virtual int	run(void) = 0;
protected:
    int threadStart(ThreadFunc func,void * arg,int nStackSize,int nPriority, bool boDetach)
        { return threadStartImp(func,arg,nStackSize,nPriority,&m_nPthreadID, boDetach); }
    bool	threadJoin(void ** status = NULL);
    bool	threadDetach(void);
    void threadYield(void);
};

class YThread:public YThreadBase, public YThreadState
{
public:
    YThread(void):YThreadBase(), YThreadState()  {}

    virtual ~YThread(void) {}

    virtual void	start(void);
    void	startEx(int nStackSize = 64 * 1024,int nPriority = YThreadBase::PRIORITY_NORMAL);

    virtual void stop(void);
    virtual int	stopEx(void);
    virtual void	beforeStop(void) =0;
    unsigned int	getThreadID(void) const { return (unsigned int)(m_nPthreadID); }


protected:
    //bool			suspend(int nSec = INFINITE_VALUE,int nNSec = INFINITE_VALUE);
private:
    static ThreadFuncReturnCode	threadProxy(void * arg);
};

#endif // YTHREAD_H




线程基类源文件


#include "ythread.h"
//#include "ttmutex.h"

#include <assert.h>


#include <pthread.h>


const char * g_lpszThreadState[] = {
    "None, runnable object not build",	//TS_NONE,			线程还没有建立
    "Just Build the runnable object",	//TS_BUILD,			线程刚刚建立完
    "Waiting for task",					//TS_WAITTING,		线程正在等待任务
    "Running",							//TS_RUNNING,		线程正在执行
    "Will be end",						//TS_TOEND,			线程准备要结束
    "Closing the runnable object",		//TS_ENDING,		线程正在结束
    "Runnable object is Closed",			//TS_END,			线程已经结束
    ""									//TS_DEF_END
};

//-------------------------------------------------------------------

const char * YThreadState::getStateString(typeState nState)
{
    return g_lpszThreadState[static_cast<int>(nState)];
}



YThreadBase::YThreadBase(void) : m_nPthreadID(0) {}

int YThreadBase::threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPriority,typeThreaID * pThreadID,bool boDetach)
{
    pthread_attr_t attr;
    pthread_attr_init(&attr); // initialize attr with default attributes
    pthread_attr_setstacksize (&attr,nStackSize);

    if (nPriority != 0)
    {
        pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
    }
    int nErrno = pthread_create(pThreadID, &attr, func, arg);
    pthread_attr_destroy(&attr);
    if (nErrno == 0 && boDetach)
        pthread_detach( *pThreadID );
    return nErrno;
}

bool YThreadBase::threadDetach(void)
{
    return pthread_detach(m_nPthreadID) == 0;
}

void YThreadBase::threadYield(void)
{
    pthread_yield();
}

bool YThreadBase::threadJoin(void ** pStatus)
{
    return pthread_join(m_nPthreadID, pStatus) == 0;
}

int YThreadBase::threadExit(int nValue)
{
    pthread_exit(reinterpret_cast<void *>(nValue));
    return nValue;
}

//-------------------------------------------------------------------

void YThread::start(void)
{
    startEx();
}

void YThread::startEx(int nStackSize,int nPriority)
{
    TAutoLock _au(m_LockState);
    int res = YThreadBase::threadStart(threadProxy,this,nStackSize,nPriority,true);
    if (res<0)
       CERR("Start Thread Error");
    pthread_cond_wait(&m_ConditionState, &m_LockState);
}

int YThread::stopEx(void)
{
    TAutoLock _au(m_LockState);
    if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND)
        return m_nResult;
    m_nState = TS_TOEND;

    beforeStop();

    while (m_nState != TS_NONE)
        pthread_cond_wait(&m_ConditionState, &m_LockState);
    return m_nResult;
}

void YThread::stop(void)
{
    TAutoLock _au(m_LockState);
    if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND)
        return;
    m_nState = TS_TOEND;

//        beforeStop();
}

ThreadFuncReturnCode YThread::threadProxy(void * arg)
{
    YThread * pThread = reinterpret_cast<YThread *>(arg);

   pthread_mutex_lock(&pThread->m_LockState);
    pThread->m_nState = TS_BUILD;
    pthread_cond_signal(&pThread->m_ConditionState);
   pthread_mutex_unlock(&pThread->m_LockState);

    pThread->m_nResult= 0;

    try
    {
        pThread->m_nResult = pThread->run();
    }

    catch(std::exception & se)
    {
        CERR(se.what());
    }
    catch(...)
    {
        CERR("unkown exp in thread run");
    }

     TAutoLock _au(pThread->m_LockState);
     pThread->m_nState = TS_NONE;
     pthread_cond_signal(&pThread->m_ConditionState);
     threadExit(pThread->m_nResult);
     return 0;
}





线程池类头文件(包括了工作线程)


#ifndef YTHREADPOOL_H
#define YTHREADPOOL_H



#include "ythread.h"
#include <list>


struct MYData
{
    std::string strData;
    int  nNum;
};


class YMYThreadPool;
class YMYThread : public YThread
{
public:
    YMYThread(size_t nThreadNum, YMYThreadPool *pParent) :YThread(),m_nThreadNum(nThreadNum) ,m_pParent(pParent)
    {
        pthread_mutex_init(&m_lockForDataListAndCond, NULL);
         pthread_cond_init(&m_condForDataList,NULL);
    }

    ~YMYThread()
    {
        pthread_mutex_destroy(&m_lockForDataListAndCond);
        pthread_cond_destroy(&m_condForDataList);
    }
    static void initGobal(size_t nParentThreadCount, size_t nDataListLimits);


    virtual void	beforeStop(void) ;
    virtual int run(void);


    bool	addDataWithoutSigna(const MYData & mydata);
    bool	addData(const MYData & mydata);


protected:

    void setCurrentState(const std::string & strState)
    {
        m_tBeginTime = time(NULL);
        m_strCurrentState = strState;
    }

    virtual int handleDataList(void);
    void dealWithEachData(const MYData & itemdata);

    typedef std::list<MYData> typeDataList;
    typeDataList m_datalist;
    pthread_mutex_t					m_lockForDataListAndCond;
    pthread_cond_t				m_condForDataList;


    YMYThreadPool * m_pParent;
    size_t					m_nThreadNum;		//Thread number

    time_t m_tBeginTime;
    std::string m_strCurrentState;

    static size_t			g_nDataListSizeLimits;
    static size_t          g_nParentPoolThreadCount;
};

class YMYThreadPool
{
public:
    YMYThreadPool(void);
    ~YMYThreadPool(void);

    void			init(size_t nThreadCount,size_t nSunDataLimits);
    void			start(void);
    void			stop(void);
    YMYThread &	getThread(const std::string & strSid);

    bool pushMission(const MYData & data);

    bool				m_boToExit;

private:
    YMYThread	**	m_ppThreads;
    size_t				m_nThreadCount;
//    TMutexEvent 		m_AllReadyEvent;

    bool				m_boInit;

};



#endif // YTHREADPOOL_H




线程池类源文件(包括工作线程的具体实现)


#include "ythreadpool.h"


const int nLimitsDefault = 1000;
const int nThreadCountDefault = 1;
size_t YMYThread::g_nDataListSizeLimits = static_cast<size_t>(nLimitsDefault);
size_t YMYThread::g_nParentPoolThreadCount = static_cast<size_t>(nThreadCountDefault);

void YMYThread::initGobal(size_t nPoolThreadCount, size_t nDataListLimits)
{
    g_nParentPoolThreadCount = nPoolThreadCount;
    g_nDataListSizeLimits = nDataListLimits;
}

void YMYThread::beforeStop(void)
{
    TAutoLock au(m_lockForDataListAndCond);
    pthread_cond_signal(&m_condForDataList);
}

int YMYThread::run(void)
{
    while(!m_pParent->m_boToExit)
    {
        try
        {
            setCurrentState( "Wait for mission" );
            pthread_mutex_lock(&m_lockForDataListAndCond);
            if (m_datalist.empty())
                pthread_cond_wait(&m_condForDataList, &m_lockForDataListAndCond);
            pthread_mutex_unlock(&m_lockForDataListAndCond);

            if ( m_pParent->m_boToExit )
                break;

            setCurrentState( "Begin handle search missions" );
            handleDataList();
        }
        catch(...)
        {
        }
    }
    return int();
}
bool	YMYThread::addDataWithoutSigna(const MYData & data)
{
    if (m_datalist.size() >= g_nDataListSizeLimits)
        return false;
    m_datalist.push_back(data);
    return true;
}

bool	YMYThread::addData(const MYData & data)
{
    TAutoLock au(m_lockForDataListAndCond);
    if (!addDataWithoutSigna(data))
        return false;
    pthread_cond_signal(&m_condForDataList);
    return true;
}


void YMYThread::dealWithEachData(const MYData & data)
{
    std::cout <<"deal each data;"<<std::endl;
    std::cout<<"thread num:"<<m_nThreadNum<<", data str:"<<data.strData<<", data num:"<<data.nNum <<std::endl;

}

int YMYThread::handleDataList(void)
{
    if ( m_pParent->m_boToExit )
        return int();

    typeDataList		datalist;		//list of mission must to do
    pthread_mutex_lock(&m_lockForDataListAndCond);		//Condition for the mission
    datalist.swap( m_datalist );
    pthread_mutex_unlock(&m_lockForDataListAndCond);

    setCurrentState( "Handling mission." );

    typeDataList::iterator it = datalist.begin();
    while (it != datalist.end())
    {
        dealWithEachData(*it);
        it++;
    }
}
///////////////////////////////////////////////////////////////////////

inline size_t	getHashIndex(const std::string & strSID,size_t nThreadCount)
{
    std::hash<std::string> hash_fn;  //c++11的新特性
    size_t nHashValue = hash_fn(strSID);
    return nHashValue % nThreadCount;
}

YMYThreadPool::YMYThreadPool():m_ppThreads(NULL),m_nThreadCount(0),
    m_boInit(false),m_boToExit(false)
{
}

YMYThreadPool::~YMYThreadPool(void)
{
    stop();
}

void YMYThreadPool::start(void)
{
    if (!m_boInit)
        return;
    for (size_t i = 0; i < m_nThreadCount; ++i)
        if ( m_ppThreads[i] != NULL )
            m_ppThreads[i]->startEx( 2 * 1024 * 1024 );
}

void YMYThreadPool::stop(void)
{
    if (m_ppThreads == NULL)
        return;

    m_boToExit = true;

    for (size_t i = 0; i < m_nThreadCount; ++i)
    {
        if ( m_ppThreads[i] != NULL )
        {
            m_ppThreads[i]->stopEx();
            delete m_ppThreads[i];
        }
    }
    delete []m_ppThreads;
    m_ppThreads = NULL;
    m_nThreadCount = 0;
}
void YMYThreadPool::init(size_t nThreadCount,size_t nSunDataLimits)
{
    YMYThread::initGobal(nThreadCount, nSunDataLimits);

    m_nThreadCount = nThreadCount;
    m_ppThreads = new YMYThread *[nThreadCount];

    for (size_t i = 0 ; i < m_nThreadCount; ++i)
    {
        m_ppThreads[i] = NULL;
        m_ppThreads[i] = new YMYThread(i,this/*,&m_AllReadyEvent*/);
        if (m_ppThreads[i] == NULL)
            CERR( "Memory not enough for Sub-Thread" );
    }
    m_boInit = true;
}


bool YMYThreadPool::pushMission(const MYData & data)
{
    if (!m_boInit)
        return false;
    unsigned nIndex = getHashIndex(data.strData, m_nThreadCount);
    return m_ppThreads[nIndex]->addData(data);
}

YMYThread &	YMYThreadPool::getThread(const std::string & strSid)
{
    std::hash<std::string> hash_fn;

    size_t nHash = hash_fn(strSid);
    return *(m_ppThreads[ nHash % m_nThreadCount ]);
}





main函数,注意请用单例模式去创建线程池


#include "ythreadpool.h"
#include <unistd.h>
int main()
{
    YMYThreadPool p;
    p.init(6, 100);
    p.start();
    MYData da1, da2;
    da1.strData = "ds1";
    da1.nNum = 11;
    da2.strData = "ds2";
    da2.nNum = 22;

    sleep(2);
    p.pushMission(da1);
    p.pushMission(da2);


     sleep(2);

    p.stop();


    return 0;
}






C++线程池2013年的博客迁移