首页 > 代码库 > 带阻塞读模式的循环队列实现
带阻塞读模式的循环队列实现
部门准备开发一个项目,这个项目其中一个模块内部实现需要用到队列来实现各小模块内部通讯,
为了实现高效的数据传输模式,决定放弃轮询的方式,改用阻塞方式实现一个循环队列。后来项目
取消了,但该部分代码也写好测试成功了,不想浪费了,就贴出来提供给有需要的人。所有代码和
测试文档我会打包上传到我的个人空间。
函数说明如下:
所有函数成功返回0,失败返回错误码。
该说明文档只是简要说明该函数库,详细情况,请结合代码。
1、该函数库实现了一个先进先出队列的功能,有如下特点
<1>线程安全,该队列出队入队均有锁操作。
<2>可以在C++代码中使用,因为头文件里增加了extern "C"声明。
<3>阻塞等待出队,调用出队函数时,标志参数传入“BLOCK”标志,
该队列实现了条件信号通知机制,用pthread_mutex_t,pthread_cond_t
变量来实现了信号量的PV模式,但我认为应该比信号量机制更有效率。
在出队函数内部使用了pthread_cond_timedwait函数机制,
可以在头文件设置 SEC_TIME,MSEC_TIME 来实现内部等待超时的秒级,微妙级超时等待。
防止如果信号丢失时,等待时间过长。增加可靠性。
但,在外部看是无法感知到,因为内部等待超时时,会自动重新检测条件,若条件不符会继续
等待,
<4>非阻塞出队,调用出队函数时传入标志“NOBLOCK”,但是该机制未经过严格测试,不保证可靠性。
<5>可以在一个进程中定义多个长度不等的队列,各队列互不影响。
2、使用方法,
<1>定义变量,在定义时需指定队列长度。定义时指定长度后,在其后操作该变量值无法再修改。
e.g tm_queue_t my_queue = {.max = 100};//定义一个最大能缓冲100个元素的队列。
struct node my_node;//队列中出队入队的基本单位。用来临时缓存。
<2>初始化队列。
e.g queue_init(&my_queue);
<3>入队
e.g queue_in(&my_queue, &my_node);
<4>出队
e.g queue_out(&my_queue, BLOCK, &my_node);
<5>清空队列
e.g queue_clear(&my_queue);
<6>销毁队列,因为初始化队列时有malloc操作,故当不用队列时需显式销毁该队列
e.g queue_destroy(&my_queue);
<7>该队列基本操作元素为struct node;使用者可以根据自己需要在该结构体内部设置自己的数据结构。
该函数库还提供其他一些函数供调试时使用。
***********************
测试 *
***********************
在test_queue目录下是测试代码及测试脚本
1、该测试代码实现到功能是,当node元素内部为一个三个整数到数组时,元素内部成员为node.buf[0]
node.buf[1],node.buf[2].
先申请两个长度为MAX到数组,int buf_1[MAX];int buf_2[MAX]
我实现两个写线程,均生成MAX个随机数,入队,并同时把入队成功到随机数分别写入buf_1,buf_2,
元素node.buf[0],保存线程编号,node.buf[1],保存该次生成到随机数,node.buf[2]保存该元素
在全局buf中存储的索引号。
2、在主线程中实现读取2*MAX个元素到的队函数,并把读出到元素和全局变量数组buf_1,buf_2中的元素对比,看是否一致,若不一致打印输出。
3、测试脚本test.sh,
该脚本实现每隔一秒执行一次编译得到的程序,并打印程序出错打印到终端。循环执行。
所需头文件:tm_queue.h
<pre name="code" class="cpp">/****************************************************************************** 版权所有 (C), 2014-2017, XXXX科技股份有限公司 ****************************************************************************** 文 件 名 : tm_queue.h 版 本 号 : 初稿 作 者 : liuhuahan 生成日期 : 2014年12月2日 最近修改 : 功能描述 : 出队时可以阻塞等待的队列 函数列表 : 修改历史 : 1.日 期 : 2014年12月2日 作 者 : liuhuahan 修改内容 : 创建文件 ******************************************************************************/ /*----------------------------------------------* * 包含头文件 * *----------------------------------------------*/ /*----------------------------------------------* * 外部变量说明 * *----------------------------------------------*/ /*----------------------------------------------* * 外部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 内部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 全局变量 * *----------------------------------------------*/ /*----------------------------------------------* * 模块级变量 * *----------------------------------------------*/ /*----------------------------------------------* * 常量定义 * *----------------------------------------------*/ /*----------------------------------------------* * 宏定义 * *----------------------------------------------*/ #ifndef TM_QUEUE_H #define TM_QUEUE_H #include <pthread.h> #include <sys/time.h> #include <stdbool.h> #include <stdlib.h> #include <stdio.h> #ifdef __cplusplus #if __cplusplus extern "C"{ #endif #endif /* __cplusplus */ #define TIMEOUT 110 /* pthread_cond_timedwait函数等待超时的返回值 */ /* 调试开关 */ //#define BZQ_DEBUG #ifdef BZQ_DEBUG #define DBG_PRINT(fmt, args...) do{ printf("%s %s %d ",__FILE__, __FUNCTION__, __LINE__); printf(fmt, ##args); }while(0); #else #define DBG_PRINT(fmt, args...) #endif #define SEC_TIME 1 /* 等待超时时间,单位s */ #define MSEC_TIME 000 /* 等待超时时间,单位ms */ #define BLOCK 1 /* 阻塞出队标志 */ #define NOBLOCK 0 /* 非阻塞出队 */ struct node{ int buf[3]; }; typedef struct tm_queue{ pthread_cond_t msg_sem;/* 消息条件信号*/ pthread_mutex_t lock;/* 互斥锁 */ const int max;/*该队列缓冲区最大值;*/ int head;/* 队列头索引号 */ int tail;/* 队列尾索引号 */ struct node *nodes;/* 该队列缓冲区指针 */ } tm_queue_t; int queue_init(tm_queue_t *tm_que_p);/* 初始化函数 */ int queue_destroy(tm_queue_t *tm_que_p);/* 销毁队列 */ int queue_in(tm_queue_t *tm_que_p, struct node * node_p);/* 入队 */ int queue_out(tm_queue_t *tm_que_p, int flag, struct node * node_p);/* 出队 */ int get_queue_length( tm_queue_t *tm_que_p); //返回队列的长度 void queue_clear(tm_queue_t *tm_que_p); //清空队列 void queue_print(tm_queue_t *tm_que_p); //打印队列 #ifdef __cplusplus #if __cplusplus } #endif #endif /* __cplusplus */ #endif /* TM_QUEUE_H */
<pre name="code" class="cpp">tm_queue.c
/****************************************************************************** 版权所有 (C), 2014-2017, XXXX科技股份有限公司 ****************************************************************************** 文 件 名 : tm_queue.c 版 本 号 : 初稿 作 者 : liuhuahan 生成日期 : 2014年12月2日 最近修改 : 功能描述 : 出队时可以阻塞等待的队列 函数列表 : get_queue_length is_empty node_print queue_destroy queue_in queue_init queue_out queue_print set_timeout 修改历史 : 1.日 期 : 2014年12月2日 作 者 : liuhuahan 修改内容 : 创建文件 ******************************************************************************/ /*----------------------------------------------* * 包含头文件 * *----------------------------------------------*/ /*----------------------------------------------* * 外部变量说明 * *----------------------------------------------*/ /*----------------------------------------------* * 外部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 内部函数原型说明 * *----------------------------------------------*/ /*----------------------------------------------* * 全局变量 * *----------------------------------------------*/ /*----------------------------------------------* * 模块级变量 * *----------------------------------------------*/ /*----------------------------------------------* * 常量定义 * *----------------------------------------------*/ /*----------------------------------------------* * 宏定义 * *----------------------------------------------*/ #include"tm_queue.h" static bool is_empty(tm_queue_t *tm_que_p); //判断队列是否为空 static int set_timeout(struct timespec * outtime, int sec, int msec); static void node_print(int num, struct node * message);/* 单个元素打印 */ /***************************************************************************** 函 数 名 : tm_queue_init 功能描述 : 队列初始化 输入参数 : tm_queue_t * tm_que_p int num 输出参数 : 无 返 回 值 : 0成功,-1 paramenter err,-2 malloc err,-3 mutex err,-4 cond err 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月27日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ int queue_init(tm_queue_t *tm_que_p) { int ret = 0; if((!tm_que_p)||(tm_que_p->max <= 0)){ return -1; } tm_que_p->head = 0; tm_que_p->tail = 0; tm_que_p->nodes = (struct node *)malloc(tm_que_p->max * sizeof(struct node)); if(tm_que_p->nodes == 0){ return -2; } ret = pthread_mutex_init(&(tm_que_p->lock),NULL); if(ret){ free(tm_que_p->nodes); return -3; } ret = pthread_cond_init(&(tm_que_p->msg_sem),NULL); if(ret){ free(tm_que_p->nodes); pthread_mutex_destroy(&tm_que_p->lock); return -4; } return ret; } /***************************************************************************** 函 数 名 : queue_destroy 功能描述 : 销毁队列 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 :0:成功,-1:参数错误,-2:mutex销毁失败,-3:cond销毁失败,-4:两个销毁均失败 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月27日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ int queue_destroy(tm_queue_t * tm_que_p) { int ret = 0, tmp = 0; if(!tm_que_p){ return -1; } tm_que_p->tail = 0; tm_que_p->head = 0; free(tm_que_p->nodes); tmp = pthread_mutex_destroy(&tm_que_p->lock); if(tmp){ DBG_PRINT("pthread_mutex_destroy failed\n") ret = -2; } tmp = pthread_cond_destroy(&tm_que_p->msg_sem); if(tmp){ DBG_PRINT("pthread_cond_destroy failed\n") if(ret){ return -4; }else{ return -3; } } return ret; } /***************************************************************************** 函 数 名 : queue_in 功能描述 : 入队函数 输入参数 : tm_queue_t * tm_que_p struct node * node_p 输出参数 : 无 返 回 值 : 0 success, -1 failed 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月27日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ int queue_in(tm_queue_t * tm_que_p,struct node * node_p) { int ret; ret = pthread_mutex_lock(&(tm_que_p->lock)); if(ret){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",ret); return -1; } if((tm_que_p->tail+ 1) % tm_que_p->max == tm_que_p->head){ //队列已满 ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG DBG_PRINT("queue have full,queue_in failed\n"); if(ret){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",ret); } #endif /* BZQ_DEBUG */ return -1; } tm_que_p->nodes[tm_que_p->tail] = *node_p; //元素进队 tm_que_p->tail = (tm_que_p->tail + 1) % (tm_que_p->max); //游标tail下移一位,如果已达最后,就移到前面 printf("id = %d, number = %d, tail = %d\n",node_p->buf[0],node_p->buf[2],tm_que_p->tail); ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",ret); } #endif /* BZQ_DEBUG */ pthread_cond_signal(&(tm_que_p->msg_sem)); return 0; } /***************************************************************************** 函 数 名 : queue_out 功能描述 : 队头元素出队 输入参数 : tm_queue_t * tm_que_p int flag 输出参数 : struct node * node_p 返 回 值 : 0成功,-1失败 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ int queue_out(tm_queue_t * tm_que_p, int flag, struct node * node_p) { int ret, tmp; struct timespec outtime; ret = pthread_mutex_lock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_lock failed,err = %d\n",ret); return -1; } #endif /* BZQ_DEBUG */ while(1){ //如果队列为空解锁等待信号 if(is_empty(tm_que_p)){ /* 若为NOBLOCK模式,直接返回-1 */ if(flag == NOBLOCK){ return -1; } set_timeout(&outtime,SEC_TIME,MSEC_TIME); ret = pthread_cond_timedwait(&(tm_que_p->msg_sem), &(tm_que_p->lock), &outtime); if(!ret){ /* 严格讲该处应该再次判断一次,防止多个读线程可能引起的冲突, * 但是因为使用队列时只有一个读线程,故该处不判断也可以. */ if(is_empty(tm_que_p)){ DBG_PRINT("UNEXPECTED IS EMPTY\n") continue; } *node_p = tm_que_p->nodes[tm_que_p->head];//返回队头的元素 tm_que_p->head = (tm_que_p->head + 1) % (tm_que_p->max); //游标header向下移一位,如果是队列的末尾移动到最前面 tmp = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG // printf("out ID = %d, number = %d, head = %d\n",node_p->buf[0],node_p->buf[2],tm_que_p->head); if(tmp){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",tmp); } #endif /* BZQ_DEBUG */ return 0; }else if(ret == TIMEOUT){ continue; }else{ DBG_PRINT("pthread_cond_timedwait failed,err = %d\n",ret); tmp = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(tmp){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",tmp); } #endif /* BZQ_DEBUG */ return -1; } }else{ *node_p = tm_que_p->nodes[tm_que_p->head];//返回队头的元素 tm_que_p->head = (tm_que_p->head + 1) % tm_que_p->max; //游标header向下移一位,如果是队列的末尾移动到最前面 tmp = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG // printf("out ID = %d, number = %d, head = %d\n",node_p->buf[0],node_p->buf[2],tm_que_p->head); if(tmp){ DBG_PRINT("pthread_mutex_unlock failed,err = %d\n",tmp); } #endif /* BZQ_DEBUG */ return 0; } } } /***************************************************************************** 函 数 名 : set_timeout 功能描述 : 设置超时时间 输入参数 : int sec:超时时间,单位s; int msec :超时时间,单位ms 输出参数 : struct timespec * outtime 返 回 值 : 0:success,-1:paramter invalid,-2 gettimeofday failed 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月28日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ static int set_timeout(struct timespec * outtime, int sec, int msec) { int ret; struct timeval now; if(outtime == NULL){ DBG_PRINT("paramer invalid\n"); return -1; } ret = gettimeofday(&now, NULL); if(ret){ DBG_PRINT("gettimeofday failed,err = %d\n",ret); return -2; } outtime->tv_sec = now.tv_sec + sec; outtime->tv_nsec = (now.tv_usec + msec*1000 )* 1000; return 0; } /***************************************************************************** 函 数 名 : is_empty 功能描述 : 检测队列是否为空,因该函数内部未加锁,故只供内部调用 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 : 0:非空,1:空 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月28日 作 者 : liu 修改内容 : 新生成函数 *****************************************************************************/ static bool is_empty(tm_queue_t * tm_que_p) { return (tm_que_p->head == tm_que_p->tail)? true : false; } /***************************************************************************** 函 数 名 : get_queue_length 功能描述 : 获得队列中存储元素个数 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 : -1失败,大于等于0成功,值为队列中的元素个数 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ int get_queue_length(tm_queue_t * tm_que_p) { int len, ret; ret = pthread_mutex_lock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_lock failed err = %d\n",ret); return -1; } #endif /* BZQ_DEBUG */ len = (tm_que_p->tail - tm_que_p->head + tm_que_p->max) % tm_que_p->max; ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_unlock failed err = %d\n",ret); return -1; } #endif /* BZQ_DEBUG */ return len; } /***************************************************************************** 函 数 名 : queue_print 功能描述 : 打印队列中的所有元素 输入参数 : tm_queue_t * tm_que_p 输出参数 : 无 返 回 值 : 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ void queue_print(tm_queue_t * tm_que_p) { int i,ret; ret = pthread_mutex_lock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_lock failed,err = %d\n",ret); return; } DBG_PRINT("head = %d, tail = %d\n",tm_que_p->head,tm_que_p->tail); #endif /* BZQ_DEBUG */ if(tm_que_p->head == tm_que_p->tail) return; else if(tm_que_p->tail < tm_que_p->head) { for(i = tm_que_p->head; i < tm_que_p->max; ++i) node_print(i, &tm_que_p->nodes[i]); for(i = 0; i < tm_que_p->tail; ++i) node_print(i, &tm_que_p->nodes[i]); } else{ for(i = tm_que_p->head ;i < tm_que_p->tail;++i) node_print(i, &tm_que_p->nodes[i]); } ret = pthread_mutex_unlock(&(tm_que_p->lock)); #ifdef BZQ_DEBUG if(ret){ DBG_PRINT("pthread_mutex_unlock failed err = %d\n",ret); return; } #endif /* BZQ_DEBUG */ } /***************************************************************************** 函 数 名 : node_print 功能描述 : 打印单个元素 输入参数 : int num 元素ID struct node * message 元素指针 输出参数 : 无 返 回 值 : 无 调用函数 : 被调函数 : 修改历史 : 1.日 期 : 2014年11月29日 作 者 : liuhuahan 修改内容 : 新生成函数 *****************************************************************************/ static void node_print(int num,struct node * message) { printf("node[%d] = [%d][%d][%d]\n",num,message->buf[0],message->buf[1],message->buf[2]); }
#include "tm_queue.h" #include <stdlib.h> #include <unistd.h> #define MAXLEN 500 int buf1[MAXLEN]; int buf2[MAXLEN]; int _buf1[MAXLEN]; int _buf2[MAXLEN]; int err1 = 0; int err2 = 0; pthread_mutex_t buf1_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t buf2_mutex = PTHREAD_MUTEX_INITIALIZER; tm_queue_t que_test = {.max = 500,}; void * test_queue_1(void *p) { int i, tmp, id; struct node tmp_node; id = (int)p; tmp_node.buf[0] = id; for(i = 0; i < MAXLEN; i++) { // printf("test_queue_1 id = %d willing be writed\n",i); tmp = random(); tmp_node.buf[1] = tmp; tmp_node.buf[2] = i; pthread_mutex_lock(&buf1_mutex); if(queue_in(&que_test,&tmp_node)){ // printf("queue_in failed, i = %d\n",i); i--; err1++; pthread_mutex_unlock(&buf1_mutex); usleep(1); continue; }else{ buf1[i] = tmp; pthread_mutex_unlock(&buf1_mutex); //if(tmp == 0){ // printf("UNEXPECTED buf1[%d] = 0\n", i); //} //printf("WRITE BUF1[%d] = %d, tmp = %d\n",i, buf1[i], tmp); // printf("queue write success, i = %d rand = %d\n",i,tmp); } //sleep(1); } // printf("the test_queue_1 have quited, i = %d\n", i); return NULL; } void * test_queue_2(void *p) { int i, tmp, id; struct node tmp_node; id = (int)p; tmp_node.buf[0] = id; for(i = 0; i < MAXLEN; i++) { tmp = random(); tmp_node.buf[1] = tmp; tmp_node.buf[2] = i; pthread_mutex_lock(&buf2_mutex); if(queue_in(&que_test,&tmp_node)){ i--; err2++; pthread_mutex_unlock(&buf2_mutex); usleep(1); continue; }else{ buf2[i] = tmp; pthread_mutex_unlock(&buf2_mutex); //if(tmp == 0){ // printf("UNEXPECTED buf2[%d] = 0\n", i); //} //printf("WRITE BUF2[%d] = %d, tmp = %d\n",i, buf2[i], tmp); } //sleep(1); } return NULL; } int main(int argc, char *argv[]) { pthread_t id_1, id_2; int i = 0; //int m = 0; //int n = 0; int ret; struct node tmp_node; if(queue_init(&que_test)){ printf("queue_init failed \n"); return -1; } printf("queue-max = %d\n", que_test.max); pthread_create(&id_1,NULL,test_queue_1,(void *)1); pthread_create(&id_2,NULL,test_queue_2,(void *)2); // sleep(10); #if 1 for(i = 0; i < 2*MAXLEN; i++) { if(queue_out(&que_test, BLOCK, &tmp_node)){ printf("queue_out failed\n"); }else{ if(((volatile int )tmp_node.buf[0]) == 1){ pthread_mutex_lock(&buf1_mutex); if(buf1[tmp_node.buf[2]] != ((volatile int)tmp_node.buf[1])) printf("buf1[%d] = %d,but read is %d\n",tmp_node.buf[2], buf1[tmp_node.buf[2]], tmp_node.buf[1]); pthread_mutex_unlock(&buf1_mutex); //m++; } else if(tmp_node.buf[0] == 2){ pthread_mutex_lock(&buf2_mutex); //printf("node id = %d\n",tmp_node.buf[0]); if(buf2[tmp_node.buf[2]] != tmp_node.buf[1]) printf("buf2[%d] = %d,but read is %d\n",tmp_node.buf[2], buf2[tmp_node.buf[2]], tmp_node.buf[1]); pthread_mutex_unlock(&buf2_mutex); //n++; }else{ printf("there is err ID = %d\n",tmp_node.buf[0]); } //printf("out ID = %d, number = %d\n",tmp_node.buf[0],tmp_node.buf[2]); } } printf("err1 = %d, err2 = %d, read number = %d\n",err1,err2,i); #endif ret = get_queue_length(&que_test); if(ret){ printf("queue length is %d\n", ret); } queue_print(&que_test); pthread_join(id_1,NULL); pthread_join(id_2,NULL); return 0; }
带阻塞读模式的循环队列实现