首页 > 代码库 > 0726------Linux基础----------线程池

0726------Linux基础----------线程池

#ifndef __DEF_H__#define __DEF_H__#include <stddef.h>#include <pthread.h>#include <stdio.h>#define TRUE 1#define FALSE 0//任务结构体typedef struct{    void (*thread_function_ptr_) (void*);    void *arg_;}task_t;//队列结构体typedef struct node_t{    task_t data_;  //队列结点内存放任务    struct node_t *next_;}node_t, *pnode_t;typedef struct{    pnode_t head_;    pnode_t tail_;    size_t size_;}queue_t;//线程池结构体typedef struct{    size_t size_; //线程池大小    pthread_t *threads_; //线程id数组    queue_t queue_; //任务队列    int is_started_; // 线程池状态 是否开启 c 没有bool    pthread_mutex_t mutex_; //互斥锁 各线程互斥访问任务队列    pthread_cond_t cond_; //条件队列 同步主线程和各子线程}pool_t;// 线程池的接口void thread_pool_init(pool_t *pool, size_t size);    //初始化线程池void thread_pool_start(pool_t *pool); // 开启线程池void thread_pool_add_task(pool_t *pool, task_t task); //往线程池中加入新的任务int  thread_pool_get_task(pool_t *pool, task_t *task);//从线程池中取出任务void thread_pool_stop(pool_t *pool); // 关闭线程池void thread_pool_deatroy(pool_t *pool); //销毁 线程池int  thread_pool_is_started(pool_t *pool);// 线程池是否开启size_t thread_pool_get_size_of_queue(pool_t *pool); //返回线程池任务队列的大小void* thread_pool_thread_func(void *); //每个线程执行的函数//队列的接口void queue_init(queue_t  *queue); //初始化队列 这里动态创建队列void queue_push(queue_t *queue, task_t task); // 入队列void queue_pop(queue_t *queue); //出队列void queue_destroy(queue_t *queue); //销毁队列void queue_clear(queue_t *queue); //清空队列int  queue_is_empty(queue_t *queue); //判断队列是否为空size_t queue_size(queue_t *queue); //返回队列的大小task_t queue_top(queue_t *queue); // 返回队首元素#endif#include "def.h"#include <stdlib.h>#include <assert.h>void queue_init(queue_t *queue){    queue->head_ = NULL;    queue->tail_ = NULL;    queue->size_ = 0;}void queue_push(queue_t *queue, task_t task){    pnode_t pCur = (pnode_t )malloc(sizeof(node_t));    pCur->data_ = task;    pCur->next_ = NULL;    if(queue_is_empty(queue)){        queue->head_= queue->tail_ = pCur;    }    else{        queue->tail_->next_ = pCur;        queue->tail_ = pCur;    }    queue->size_++;}void queue_pop(queue_t *queue){    assert(!queue_is_empty(queue));    pnode_t pCur = queue->head_;    queue->head_ = queue->head_->next_;    free(pCur);    queue->size_ --;}void queue_destroy(queue_t *queue){    queue_clear(queue);}void queue_clear(queue_t *queue){    while(!queue_is_empty(queue)){        queue_pop(queue);    }}int queue_is_empty(queue_t *queue){    return queue->size_ == 0;}size_t queue_size(queue_t *queue){    return queue->size_;}task_t queue_top(queue_t *queue){    return queue->head_->data_;}#include "def.h"#include <stdio.h>#include <stdlib.h>#define POOL_SIZE 3void quare(void *arg){    int num = (int)arg;    printf("%d * %d = %d\n", num, num, num * num);}int main(int argc, const char *argv[]){    pool_t pool;    task_t task;    srand(10000);    thread_pool_init(&pool, POOL_SIZE);    thread_pool_start(&pool);    while(1){        task.thread_function_ptr_ = quare;        task.arg_ = (void *)(rand()%100);        thread_pool_add_task(&pool, task);        sleep(1);    }    thread_pool_stop(&pool);    thread_pool_destroy(&pool);    return 0;}#include "def.h"#include <stdio.h>/* * 测试队列 */void func_ptr(void *arg){    printf("arg = %d\n", (int)arg);}int main(int argc, const char *argv[]){    queue_t queue;    queue_init(&queue);    task_t task, task_2;    task.thread_function_ptr_ = func_ptr;    task.arg_ = (void *)10;    queue_push(&queue, task);    task_2 = queue_top(&queue);    printf("task_2.arg = %d\n",(int)task_2.arg_);    queue_pop(&queue);    printf("queue_is_empty = %d\n", queue_is_empty(&queue));    return 0;}#include "def.h"#include <stdlib.h>#include <assert.h>void thread_pool_init(pool_t *pool, size_t size){    pool->size_ = size;    pool->threads_ = (pthread_t  *)malloc(pool->size_ * sizeof(pthread_t));    queue_init(&pool->queue_);    pool->is_started_ = FALSE;    pthread_mutex_init(&pool->mutex_, NULL);    pthread_cond_init(&pool->cond_, NULL);}void *thread_pool_thread_func(void * arg){    pool_t *pool = (pool_t *)arg;    task_t task;    while(1){        int ret = thread_pool_get_task(pool, &task);        if(ret == TRUE)            task.thread_function_ptr_(task.arg_);        else //此时说明线程池关闭            break;    }}void thread_pool_start(pool_t *pool){    if(pool->is_started_ == FALSE){        pool->is_started_ = TRUE;        int i;        for(i = 0; i < pool->size_; i++){            pthread_create(&pool->threads_[i], NULL,thread_pool_thread_func, (void*)pool);        }    }}void thread_pool_add_task(pool_t *pool, task_t task){    assert(pool->is_started_);    pthread_mutex_lock(&pool->mutex_);    queue_push(&pool->queue_, task); //将新任务加入任务队列中去    pthread_cond_signal(&pool->cond_);    pthread_mutex_unlock(&pool->mutex_);}int thread_pool_get_task(pool_t *pool, task_t *task){ // 根据返回值判断是否成功取出任务    pthread_mutex_lock(&pool->mutex_);    while(queue_is_empty(&pool->queue_) && pool->is_started_ == TRUE){        pthread_cond_wait(&pool->cond_, &pool->mutex_);    }    if(pool->is_started_ == FALSE){//有可能是关闭线程池时 被唤醒的        pthread_mutex_unlock(&pool->mutex_);        return FALSE;    }    *task = queue_top(&pool->queue_);    queue_pop(&pool->queue_);    pthread_mutex_unlock(&pool->mutex_);    return TRUE;}void thread_pool_stop(pool_t *pool){    if(pool->is_started_ == FALSE)        return;    pool->is_started_ = FALSE;    pthread_cond_broadcast(&pool->cond_); //唤醒所有睡眠线程 结束回收资源    int i;    for(i = 0; i < pool->size_; i++){        pthread_join(pool->threads_[i], NULL);    }    queue_clear(&pool->queue_); // 清空任务队列}void thread_pool_destroy(pool_t *pool){ // 销毁线程池    thread_pool_stop(pool);    pthread_mutex_destroy(&pool->mutex_); // 销毁互斥锁和条件变量    pthread_cond_destroy(&pool->cond_);    free(pool->threads_); //释放动态分配的内存 线程数组和任务队列    queue_destroy(&pool->queue_);}int thread_pool_is_started(pool_t *pool){    return pool->is_started_ == TRUE;}size_t thread_pool_get_size_of_queue(pool_t *pool){    return pool->queue_.size_;}