首页 > 代码库 > [APUE] 线程池

[APUE] 线程池

1 线程池技术简介

“池”化技术通常都是为了应对“小”的特点而开发出来的,比如:

内存池是针对小块内存的申请和释放过于频繁导致的效率低下问题。先分配一定量的内存,按照大小分类,当程序需要小块内存(这里的小是相对而言的,看实现方式)时,就从某个大块内存中截取小块内存,用完了之后,就再放入大块内存中。当然,这里说的只是基本的思想,在实现的时候,有针对不同的分配方式的优化方案。
线程池是针对大量短任务申请线程和释放线程过于频繁导致的效率低下问题,如果任务运行时间较短,而申请线程和释放线程的时间较长,那么频繁地进行线程的申请和释放必将降低效率。先分配一定数量的线程,如果有任务到达,就从线程池中取出一个线程处理任务,处理完成后,将线程归还给线程池。于是,就避免了过于频繁线程的申请和释放导致的效率低下问题。而且,还可以根据当前线程的负载进行线程的增加和减少,当现有的线程比较忙碌时,可以增加线程,但是,线程也不能无限增加,因为过多的线程会增大系统开销,当现有的线程多数处于空闲时,可以减少线程。
当然,还有其它的池化技术,不过,原理大体都是类似的。


2 线程池的实现方案

初始化时,线程池创建默认个数的线程,并将线程ID存放到工作者队列中。

在线程池中设置了一个finish的结束变量,当用户调用了threadpool_destroy()时,就设置该变量,并通知所有的工作者,然后等待所有的工作者线程退出。

每个线程循环检测工作队列,当工作队列不为空时,就从该队列中取出一个工作执行,如果工作队列为空并且设置了finish变量,就退出工作者线程,如果工作队列为空,但是没有设置finish变量,则继续等待工作。

本实现没有考虑负载均衡。


3 线程池的源代码及注释

/* threadpool.h */
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H

#include <pthread.h>

#define	THREADPOOL_MAX_THREADS	8 /* 最大线程数 */
#define	THREADPOOL_MIN_THREADS	1 /* 最小线程数 */
#define	THREADPOOL_DEF_THREADS	4 /* 默认线程数 */

struct work_queue_s {
	void* (*routine)(void *); /* 工作例程 */
	void *arg; /* 工作例程参数 */
	struct work_queue_s *next;
};

typedef struct work_queue_s work_queue_t; /* 工作队列 */

struct worker_queue_s {
	pthread_t id; /* 线程ID */
	struct worker_queue_s *next;
};

typedef struct worker_queue_s worker_queue_t; /* 工作者队列 */

struct threadpool_s {
	int finish; /* 是否结束 */
	int cur_thread_num; /* 当前线程数 */
	worker_queue_t *workers; /* 工作者队列 */
	work_queue_t *first; /* 工作队列首指针 */
	work_queue_t *last; /* 工作队列尾指针 */
	pthread_cond_t queue_nonempty; /* 工作队列是否为空的条件变量 */
	pthread_mutex_t queue_lock; /* 工作队列锁 */
};

typedef struct threadpool_s threadpool_t; /* 线程池结构 */

/* 创建线程池 */
threadpool_t* threadpool_create();

/* 向线程池中添加工作 */
int threadpool_insert_work(threadpool_t*, void* (*)(void *), void*);

/* 创建工作者 */
int threadpool_create_worker(threadpool_t*);

/* 销毁工作者 */
int threadpool_destroy_worker(threadpool_t*);

/* 销毁线程池 */
int threadpool_destroy(threadpool_t*);

#endif

/* threadpool.c */
#include <stdio.h>
#include <stdlib.h>
#include "threadpool.h"

/* 创建线程,分配线程池结构,初始化结构中的各个元素,创建默认个数的线程 */
threadpool_t* threadpool_create()
{
	threadpool_t *tp = (threadpool_t *)malloc(sizeof(threadpool_t));
	if(tp == NULL) {
		fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
		return tp;
	}

	tp->finish = 0;
	tp->cur_thread_num = 0;
	tp->workers = NULL;
	tp->first = NULL;
	tp->last = NULL;

	if(pthread_cond_init(&tp->queue_nonempty, NULL) != 0) {
		fprintf(stderr, "%s: pthread_cond_init failed\n", __FUNCTION__);
		free(tp);
		return NULL;
	}

	if(pthread_mutex_init(&tp->queue_lock, NULL) != 0) {
		fprintf(stderr, "%s: pthread_mutex_init failed\n", __FUNCTION__);
		free(tp);
		return NULL;
	}

	int cnt = THREADPOOL_DEF_THREADS;
	while(cnt--) {
		threadpool_create_worker(tp);
	}
	printf("create threadpool success\n");
	printf("contain %d threads\n", THREADPOOL_DEF_THREADS);

	return tp;
}

/* 往线程池中添加工作 */
int threadpool_insert_work(threadpool_t *tp, void* (*routine)(void *), void* arg)
{
	work_queue_t *wq = malloc(sizeof(work_queue_t));
	if(wq == NULL) {
		fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
		return -1;
	}

	wq->routine = routine;
	wq->arg = arg;
	wq->next = NULL;

	pthread_mutex_lock(&tp->queue_lock);
	if(tp->first == NULL && tp->last == NULL) {
		tp->first = wq;
		tp->last = wq;
	}
	else {
		tp->last->next = wq;
		tp->last = wq;
	}
	pthread_mutex_unlock(&tp->queue_lock);
	pthread_cond_signal(&tp->queue_nonempty);

	return 0;
}

/* 工作者线程的执行函数 */
void* thread_routine(void* arg)
{
	work_queue_t *wq = NULL;
	threadpool_t *tp = (threadpool_t *)arg;

	while(1) {
		pthread_mutex_lock(&tp->queue_lock);
		while(tp->finish == 0 && tp->first == NULL) {
			pthread_cond_wait(&tp->queue_nonempty, &tp->queue_lock);
		}
		if(tp->finish && tp->first == NULL) {
			pthread_mutex_unlock(&tp->queue_lock);
			pthread_exit(NULL);
		}
		
		work_queue_t *wq = NULL;
		wq = tp->first;
		tp->first = wq->next;
		if(wq->next == NULL) {
			tp->last = NULL;
		}
		pthread_mutex_unlock(&tp->queue_lock);

		wq->routine(wq->arg);
		printf("current thread: %u\n", (unsigned int)pthread_self());
		free(wq);
	}
}

/* 创建工作者,并将工作者的线程ID记录到工作者队列中 */
int threadpool_create_worker(threadpool_t *tp)
{
	pthread_t tid;
	if(pthread_create(&tid, NULL, thread_routine, tp) != 0) {
		fprintf(stderr, "%s: pthread_create failed\n", __FUNCTION__);
		return -1;
	}

	worker_queue_t *worker = (worker_queue_t *)malloc(sizeof(worker_queue_t));
	if(worker == NULL) {
		fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
		return -1;
	}
	worker->id = tid;
	worker->next = NULL;

	worker->next = tp->workers;
	tp->workers = worker;
	tp->cur_thread_num++;
	printf("create worker %u\n", (unsigned int)tid);

	return 0;
}

/* 销毁工作者 */
int threadpool_destroy_worker(threadpool_t *tp)
{
	worker_queue_t *worker = NULL;
	if(tp->workers == NULL) {
		return -1;
	}
	worker = tp->workers;
	tp->workers = worker->next;
	tp->cur_thread_num--;

	pthread_t tid = worker->id;
	free(worker);

	if(pthread_join(tid, NULL) != 0) {
		fprintf(stderr, "%s: pthread_join failed\n", __FUNCTION__);
		return -1;
	}
	printf("destroy %u success\n", (unsigned int)tid);

	return 0;
}

/* 销毁线程池,等待所有线程结束 */
int threadpool_destroy(threadpool_t *tp)
{
	pthread_mutex_lock(&tp->queue_lock);
	tp->finish = 1;
	pthread_mutex_unlock(&tp->queue_lock);
	pthread_cond_broadcast(&tp->queue_nonempty);
	int cnt = tp->cur_thread_num;

	printf("ready to destroy %d worker\n", cnt);

	while(cnt--) {
		threadpool_destroy_worker(tp);
	}

	free(tp);

	return 0;
}

/* test_threadpool.c */
#include <stdio.h>
#include "threadpool.h"

void *routine(void *arg)
{
	printf("%d\n", (int)arg);
	return NULL;
}

int main(int argc, char const *argv[])
{
	threadpool_t *tp = threadpool_create();

	int i = 0;
	while(i < 10) {
		threadpool_insert_work(tp, routine, (void*)i);
		++i;
	}

	threadpool_destroy(tp);

	return 0;
}


4 程序的执行结果
上述程序的执行结果是:

create worker 3076107072
create worker 3067714368
create worker 3059321664
create worker 3050928960
create threadpool success
contain 4 threads
0
ready to destroy 4 worker
1
current thread: 3067714368
2
current thread: 3067714368
3
current thread: 3067714368
4
current thread: 3067714368
5
current thread: 3067714368
6
current thread: 3067714368
7
current thread: 3067714368
8
current thread: 3067714368
9
current thread: 3067714368
current thread: 3076107072
destroy 3050928960 success
destroy 3059321664 success
destroy 3067714368 success
destroy 3076107072 success

可以看到,有十个工作,但是只有两个线程执行了,而且一个线程执行了9个工作,另一个线程执行了1个工作。

如果将工作函数改成这样:

void *routine(void *arg)
{
	sleep(1);
	printf("%d\n", (int)arg);
	return NULL;
}

结果就变成:

create worker 3075525440
create worker 3067132736
create worker 3058740032
create worker 3050347328
create threadpool success
contain 4 threads
ready to destroy 4 worker
2
1
current thread: 3067132736
0
current thread: 3075525440
3
current thread: 3050347328
current thread: 3058740032
5
current thread: 3075525440
6
7
current thread: 3058740032
current thread: 3050347328
4
current thread: 3067132736
destroy 3050347328 success
9
current thread: 3058740032
8
current thread: 3075525440
destroy 3058740032 success
destroy 3067132736 success
destroy 3075525440 success

当线程执行工作时,调用了sleep(),线程休眠,此时,可以调度其它的线程执行工作。


5 实现过程中遇到的问题

(1)在编写多线程程序时,通常会用到锁。使用锁的时候,特别要注意的是:锁保护的是哪那个成员,那个成员是否有必要用锁保护。比如,这里的工作队列,由于工作队列可能会被多个线程使用,某些线程想要从中获取工作,某个线程想向其中添加工作,于是,需要用锁来进行保护。而这里的工作者队列呢?由于工作者队列不会被多个线程使用,它只能被主线程使用,因此,不需要用锁进行保护。

(2)关于条件变量。在使用条件变量时,如果有多个条件,在修改任意一个条件时,都要进行通知。