首页 > 代码库 > 线程池学习笔记

线程池学习笔记

记录一下学习线程池的过程,代码用到的函数归结: 
pthread_mutex_lock 
pthread_mutex_unlock 
pthread_cond_wait 
pthread_cond_signal 
pthread_cond_broadcast 
pthread_create 
pthread_join 
程序中还用到了链表, 
还有一个知识点:任何类型的数据都可以是void类型, 
但void类型在使用之前必须进行强制类型转换。

/*
*Author:Greens_Ren
*Description:线程池
*/
/*头文件*/
#include<pthread.h>
#include<stdio.h>
/* begin : add */
#include<stdlib.h> 
#include<unistd.h>
#include<assert.h>
#include<sys/types.h>
/* end : add */

/*数据结构*/
typedef struct Thread_worker
{
    void *(*worker)(void *arg);
    void * arg;
    struct Thread_worker *next;
}CThread_worker;

typedef struct Thread_pool
{
    pthread_mutex_t queue_lock;
    pthread_cond_t queue_ready;

    int max_thread_num;
    pthread_t *phead_threadid;

    int cur_queue_size;
    CThread_worker *phead;

    int shutdown;

}CThread_pool;

/*全局区*/
static CThread_pool *pool = NULL;

/*函数*/

void pthread_init(int max_thread_num);
void *thread_roution(void * arg);
void pthread_add_worker(void *(*worker)(void *arg), void * arg);
void pthread_destroy(void);
void* my_process(void *arg);

/*main*/
int main(void)
{
    int max_thread_num = 3;
    int worker_num = 10;

    /*初始化*/
    pthread_init(max_thread_num);

    /*投入任务*/
    int *workernum = (int *)malloc(sizeof(int) * worker_num);
    int i;
    for(i = 0; i < worker_num; i++)
    {
        workernum[i] = i;
        pthread_add_worker(my_process, &workernum[i]);
    }

    /*等待处理任务*/
    sleep(8);
    pthread_destroy();

    return 0;
}

void pthread_init(int max_thread_num)
{
    pool = (CThread_pool*)malloc(sizeof(CThread_pool)); /*free(pool)*/

    /* begin : modified 初始化互斥锁和条件变量*/   
    pthread_mutex_init(&(pool->queue_lock), NULL);
    pthread_cond_init(&(pool->queue_ready), NULL);
    /* end : modified */

    /*初始化线程*/
    pool->max_thread_num = max_thread_num;
    pool->phead_threadid = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num); /*free*/
    int i;
    for (i = 0; i < max_thread_num; i ++)
    {
        pthread_create(&(pool->phead_threadid[i]), NULL, thread_roution, NULL);
    }

    /*初始化任务等待队列*/
    pool->cur_queue_size = 0;
    //pool->phead = (CThread_worker *)malloc(sizeof(CThread_worker));
    //pool->phead->next = NULL;
    /*罪过,这里考虑多了。在pool中就是存了一个指针,只不过它是指定了一个链表*/
    pool->phead = NULL;

    /*线程池销毁标记*/
    pool->shutdown = 0;

    return;
}
void *thread_roution(void * arg)
{
    printf("starting thread 0x%x\n", pthread_self());
    while(1)  /*Added 线程要持续运行,这里考虑使用while(1)来实现*/
    {
        pthread_mutex_lock(&(pool->queue_lock));
        /*如果目前没有任务处理且线程池未销毁就睡眠等待添加任务*/
        //if(pool->cur_queue_size == 0 && pool->shutdown != 1)
        while(pool->cur_queue_size == 0 && pool->shutdown != 1) /*modified by*/
        {
            //pthread_cond_wait( &(pool->queue_lock), &(pool->queue_ready) );
            printf("thread 0x%x is waiting\n", pthread_self());
            pthread_cond_wait( &(pool->queue_ready), &(pool->queue_lock) );
        }
        /*如果线程池已被标记销毁,那就退出线程*/
        if(pool->shutdown == 1)
        {

            pthread_mutex_unlock(&(pool->queue_lock));
            printf("thread 0x%x will exit\n", pthread_self());
            pthread_exit(NULL);
        }

        printf("thread 0x%x is starting to work\n", pthread_self());

        /*assert是调试用的好助手,assert如果为家,它就会通过stderr打印错误信息,并终止程序*/
        assert(pool->cur_queue_size != 0);
        assert(pool->shutdown != 1);

        /*开始处理等待队列中的任务*/
        /*回调函数*/

        pool->cur_queue_size--;
        CThread_worker * worker_waiting = pool->phead;
        pool->phead = worker_waiting->next;
        /*这里加锁的目的就是为了处理任务链表,处理完后就 可以解锁,让其它线程再次去处理任务链表*/
        pthread_mutex_unlock(&(pool->queue_lock));  /*modified by 开始的时候讲解锁放在了回调函数后面,这里做修正*/

        /*调用回调函数,执行任务*/
        (*(worker_waiting->worker))(worker_waiting->arg);
        /*删除链表中已经执行过的任务节点*/
        free(worker_waiting);
        worker_waiting = NULL;
    }
}
void pthread_add_worker(void *(*worker)(void *arg), void * arg)
{
    assert(worker != NULL);
    assert(pool->shutdown != 1);

    /*构建新任务插入到任务链表尾部*/
    CThread_worker * worker_insert = 
        (CThread_worker *)malloc(sizeof(CThread_worker));/*free*/
    worker_insert->worker = worker;
    worker_insert->arg = arg;
    worker_insert->next = NULL;

    /*下面要处理任务链表了,这里要加锁,保护链表。第一次写的时候就忘记了加锁*/
    pthread_mutex_lock(&(pool->queue_lock));
    CThread_worker* phead_worker = pool->phead;

    /*链表的头指针一开始其实就是空指针,所以如果为NULL,就直接将构建的节点地址赋值给它就可以了*/
    if(phead_worker != NULL)
    {
        while( phead_worker->next != NULL )
        {
            phead_worker = phead_worker->next;
        }
        phead_worker->next = worker_insert; /*这里将新构建的任务节点添加到了等待任务链表的结尾*/
    }
    else
    {
        pool->phead = worker_insert;
    }

    assert(pool->phead != NULL);/*这里检查一下等待链表不为空*/
    pool->cur_queue_size++; /*同步修改等待队列的长度信息*/
    pthread_mutex_unlock(&(pool->queue_lock));
    /*等待队列中添加了新任务,这里就要唤醒线程去处理,如果无线程睡眠,这条语句就无效*/
    pthread_cond_signal(&(pool->queue_ready));

    return;
}
void pthread_destroy(void)
{
    if(pool->shutdown)
    {
        return; /*这里防止多次调用*/
    }
    /*这里先将销毁线程池标记置位*/
    pool->shutdown = 1;

    /*唤醒所以等待线程,线程池要销毁了*/
    pthread_cond_broadcast(&(pool->queue_ready));

    /*阻塞等待线程线程退出,否则就成了僵尸了*/
    int i;
    for (i = 0; i < pool->max_thread_num; i ++)
    {
        pthread_join(pool->phead_threadid[i], NULL);
    }
    /*释放线程号存储占用资源*/
    free(pool->phead_threadid);
    pool->phead_threadid = NULL;

    /*释放任务等待队列*/
    CThread_worker *pworker_del = NULL;
    while(pool->phead != NULL)
    {
        pworker_del = pool->phead;
        pool->phead = pool->phead->next;
        free(pworker_del);
        pworker_del = NULL;
    }

    /*销毁条件变量和互斥锁,刚开始也忘记了*/
    pthread_mutex_destroy(&(pool->queue_lock));
    pthread_cond_destroy(&(pool->queue_ready));

    /*释放线程池*/
    free(pool);
    pool = NULL;

    return;
}
void *my_process(void * arg)
{
    printf("thread is 0x%x, working ont task %d\n", pthread_self(), *(int *)arg);
    sleep(1);
    return;
}

线程池学习笔记