首页 > 代码库 > 0726------Linux基础----------线程池
0726------Linux基础----------线程池
#ifndef __DEF_H__#define __DEF_H__#include <stddef.h>#include <pthread.h>#include <stdio.h>#define TRUE 1#define FALSE 0//任务结构体typedef struct{ void (*thread_function_ptr_) (void*); void *arg_;}task_t;//队列结构体typedef struct node_t{ task_t data_; //队列结点内存放任务 struct node_t *next_;}node_t, *pnode_t;typedef struct{ pnode_t head_; pnode_t tail_; size_t size_;}queue_t;//线程池结构体typedef struct{ size_t size_; //线程池大小 pthread_t *threads_; //线程id数组 queue_t queue_; //任务队列 int is_started_; // 线程池状态 是否开启 c 没有bool pthread_mutex_t mutex_; //互斥锁 各线程互斥访问任务队列 pthread_cond_t cond_; //条件队列 同步主线程和各子线程}pool_t;// 线程池的接口void thread_pool_init(pool_t *pool, size_t size); //初始化线程池void thread_pool_start(pool_t *pool); // 开启线程池void thread_pool_add_task(pool_t *pool, task_t task); //往线程池中加入新的任务int thread_pool_get_task(pool_t *pool, task_t *task);//从线程池中取出任务void thread_pool_stop(pool_t *pool); // 关闭线程池void thread_pool_deatroy(pool_t *pool); //销毁 线程池int thread_pool_is_started(pool_t *pool);// 线程池是否开启size_t thread_pool_get_size_of_queue(pool_t *pool); //返回线程池任务队列的大小void* thread_pool_thread_func(void *); //每个线程执行的函数//队列的接口void queue_init(queue_t *queue); //初始化队列 这里动态创建队列void queue_push(queue_t *queue, task_t task); // 入队列void queue_pop(queue_t *queue); //出队列void queue_destroy(queue_t *queue); //销毁队列void queue_clear(queue_t *queue); //清空队列int queue_is_empty(queue_t *queue); //判断队列是否为空size_t queue_size(queue_t *queue); //返回队列的大小task_t queue_top(queue_t *queue); // 返回队首元素#endif#include "def.h"#include <stdlib.h>#include <assert.h>void queue_init(queue_t *queue){ queue->head_ = NULL; queue->tail_ = NULL; queue->size_ = 0;}void queue_push(queue_t *queue, task_t task){ pnode_t pCur = (pnode_t )malloc(sizeof(node_t)); pCur->data_ = task; pCur->next_ = NULL; if(queue_is_empty(queue)){ queue->head_= queue->tail_ = pCur; } else{ queue->tail_->next_ = pCur; queue->tail_ = pCur; } queue->size_++;}void queue_pop(queue_t *queue){ assert(!queue_is_empty(queue)); pnode_t pCur = queue->head_; queue->head_ = queue->head_->next_; free(pCur); queue->size_ --;}void queue_destroy(queue_t *queue){ queue_clear(queue);}void queue_clear(queue_t *queue){ while(!queue_is_empty(queue)){ queue_pop(queue); }}int queue_is_empty(queue_t *queue){ return queue->size_ == 0;}size_t queue_size(queue_t *queue){ return queue->size_;}task_t queue_top(queue_t *queue){ return queue->head_->data_;}#include "def.h"#include <stdio.h>#include <stdlib.h>#define POOL_SIZE 3void quare(void *arg){ int num = (int)arg; printf("%d * %d = %d\n", num, num, num * num);}int main(int argc, const char *argv[]){ pool_t pool; task_t task; srand(10000); thread_pool_init(&pool, POOL_SIZE); thread_pool_start(&pool); while(1){ task.thread_function_ptr_ = quare; task.arg_ = (void *)(rand()%100); thread_pool_add_task(&pool, task); sleep(1); } thread_pool_stop(&pool); thread_pool_destroy(&pool); return 0;}#include "def.h"#include <stdio.h>/* * 测试队列 */void func_ptr(void *arg){ printf("arg = %d\n", (int)arg);}int main(int argc, const char *argv[]){ queue_t queue; queue_init(&queue); task_t task, task_2; task.thread_function_ptr_ = func_ptr; task.arg_ = (void *)10; queue_push(&queue, task); task_2 = queue_top(&queue); printf("task_2.arg = %d\n",(int)task_2.arg_); queue_pop(&queue); printf("queue_is_empty = %d\n", queue_is_empty(&queue)); return 0;}#include "def.h"#include <stdlib.h>#include <assert.h>void thread_pool_init(pool_t *pool, size_t size){ pool->size_ = size; pool->threads_ = (pthread_t *)malloc(pool->size_ * sizeof(pthread_t)); queue_init(&pool->queue_); pool->is_started_ = FALSE; pthread_mutex_init(&pool->mutex_, NULL); pthread_cond_init(&pool->cond_, NULL);}void *thread_pool_thread_func(void * arg){ pool_t *pool = (pool_t *)arg; task_t task; while(1){ int ret = thread_pool_get_task(pool, &task); if(ret == TRUE) task.thread_function_ptr_(task.arg_); else //此时说明线程池关闭 break; }}void thread_pool_start(pool_t *pool){ if(pool->is_started_ == FALSE){ pool->is_started_ = TRUE; int i; for(i = 0; i < pool->size_; i++){ pthread_create(&pool->threads_[i], NULL,thread_pool_thread_func, (void*)pool); } }}void thread_pool_add_task(pool_t *pool, task_t task){ assert(pool->is_started_); pthread_mutex_lock(&pool->mutex_); queue_push(&pool->queue_, task); //将新任务加入任务队列中去 pthread_cond_signal(&pool->cond_); pthread_mutex_unlock(&pool->mutex_);}int thread_pool_get_task(pool_t *pool, task_t *task){ // 根据返回值判断是否成功取出任务 pthread_mutex_lock(&pool->mutex_); while(queue_is_empty(&pool->queue_) && pool->is_started_ == TRUE){ pthread_cond_wait(&pool->cond_, &pool->mutex_); } if(pool->is_started_ == FALSE){//有可能是关闭线程池时 被唤醒的 pthread_mutex_unlock(&pool->mutex_); return FALSE; } *task = queue_top(&pool->queue_); queue_pop(&pool->queue_); pthread_mutex_unlock(&pool->mutex_); return TRUE;}void thread_pool_stop(pool_t *pool){ if(pool->is_started_ == FALSE) return; pool->is_started_ = FALSE; pthread_cond_broadcast(&pool->cond_); //唤醒所有睡眠线程 结束回收资源 int i; for(i = 0; i < pool->size_; i++){ pthread_join(pool->threads_[i], NULL); } queue_clear(&pool->queue_); // 清空任务队列}void thread_pool_destroy(pool_t *pool){ // 销毁线程池 thread_pool_stop(pool); pthread_mutex_destroy(&pool->mutex_); // 销毁互斥锁和条件变量 pthread_cond_destroy(&pool->cond_); free(pool->threads_); //释放动态分配的内存 线程数组和任务队列 queue_destroy(&pool->queue_);}int thread_pool_is_started(pool_t *pool){ return pool->is_started_ == TRUE;}size_t thread_pool_get_size_of_queue(pool_t *pool){ return pool->queue_.size_;}
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。