首页 > 代码库 > 使用无锁队列(环形缓冲区)注意事项
使用无锁队列(环形缓冲区)注意事项
环形缓冲区是生产者和消费者模型中常用的数据结构。生产者将数据放入数组的尾端,而消费者从数组的另一端移走数据,当达到数组的尾部时,生产者绕回到数组的头部。如果只有一个生产者和一个消费者,那么就可以做到免锁访问环形缓冲区(Ring Buffer)。写入索引只允许生产者访问并修改,只要写入者在更新索引之前将新的值保存到缓冲区中,则读者将始终看到一致的数据结构。同理,读取索引也只允许消费者访问并修改。
环形缓冲区实现原理图
如图所示,当读者和写者指针相等时,表明缓冲区是空的,而只要写入指针在读取指针后面时,表明缓冲区已满。
清单 9. 2.6.10 环形缓冲区实现代码
/*
* __kfifo_put - puts some data into the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don‘t need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
/* first put the data starting from fifo->in to buffer end */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);
fifo->in += len;
return len;
}
/*
* __kfifo_get - gets some data from the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don‘t need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
/* first get the data from fifo->out until the end of the buffer */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);
fifo->out += len;
return len;
}
需要注意的是
使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)时,如果返回参数与传入参数len不相等时,则操作失败
我们定义一个
//注意student_info 共17字节 按照内存排列占24字节
typedef struct student_info
{
uint64_t stu_id; //8个字节
uint32_t age; //4字节
uint32_t score;//4字节
char sex;//1字节
}student_info;
我们建立一个环形缓冲区,里面只有64字节大小(虽然我们实际使用时大小远大于此),向里面多次存入24字节student_info,看有什么反应
//打印学生信息
void print_student_info(const student_info *stu_info)
{
assert(stu_info);
printf("id:%lu\t",stu_info->stu_id);
printf("age:%u\t",stu_info->age);
printf("sex:%d\t",stu_info->sex);
printf("score:%u\n",stu_info->score);
}
student_info * get_student_info(time_t timer)
{
student_info *stu_info = (student_info *)malloc(sizeof(student_info));
srand(timer);
stu_info->stu_id = 10000 + rand() % 9999;
stu_info->age = rand() % 30;
stu_info->score = rand() % 101;
stu_info->sex=rand() % 2;
print_student_info(stu_info);
return stu_info;
}
void print_ring_buffer_len(struct ring_buffer *ring_buf)
{
//用于打印缓冲区长度
uint32_t ring_buf_len = 0;
//取得已经使用缓冲区长度 size-ring_buf_len为未使用缓冲区的长度
ring_buf_len=ring_buffer_len(ring_buf);
printf("no use ring_buf_len:%d\n",(ring_buf->size-ring_buf_len));
}
int main(int argc, char *argv[])
{
uint32_t size = 0;
//用于判断存储或者取得数据的字节数
uint32_t oklen = 0;
struct ring_buffer *ring_buf = NULL;
//64字节
size=BUFFER_SIZE;
ring_buf = ring_buffer_alloc(size);
printf("input student\n");
{
student_info *stu_info;
student_info stu_temp;
uint32_t student_len=sizeof(student_info);
printf("ring_buf_len:%d\n",ring_buf->size);
printf("student_len:%d\n",student_len);
//此时环形缓冲区没有数据我们去取数据当然为空
memset(&stu_temp,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)(&stu_temp), student_len);
if(oklen==student_len)
{
printf("get student data\n");
}
else
{
printf("no student data\n");
}
printf("\n");
//第一次调用时用字节结束后还有64-24 =40字节
stu_info = get_student_info(976686458);
oklen = ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("1 put student data success\n");
}
else
{
printf("1 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第二次调用时用字节结束后还有64-48 =16字节
stu_info = get_student_info(976686464);
oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("2 put student data success\n");
}
else
{
printf("2 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第三次调用时需要用字节但只有字节失败
//把字节都写满了
//验证了在调用__kfifo_put函数或者__kfifo_get函数时,如果返回参数与传入参数len不相等时,则操作失败
stu_info = get_student_info(976686445);
oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("3 put student data success\n");
}
else
{
printf("3 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第四次调用时需要用字节但无字节
////验证了在调用__kfifo_put函数或者__kfifo_get函数时,如果返回参数与传入参数len不相等时,则操作失败
stu_info = get_student_info(976686421);
oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("4 put student data success\n");
}
else
{
printf("4 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//现在开始取学生数据里面保存了个学生数据我们取三次看效果
printf("output student\n");
printf("\n");
//第一次取得数据并打印
memset(stu_info,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
print_student_info(stu_info);
printf("1 get student data success\n");
}
else
{
printf("1 get student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
////第二次取得数据并打印
memset(stu_info,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
print_student_info(stu_info);
printf("2 get student data success\n");
}
else
{
printf("2 get student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第三次取得数据失败
memset(stu_info,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
print_student_info(stu_info);
printf("3 get student data success\n");
}
else
{
printf("3 get student data failure\n");
}
print_ring_buffer_len(ring_buf);
}
return 1;
}
结论:在使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)时,如果返回参数与传入参数len不相等时,则操作失败。代码下载:tessc.rar(http://files.cnblogs.com/dragonsuc/TDDOWNLOAD.zip)
需要注意的地方:
1.只有一个线程负责读,另一个线程负责写的时候,数据是线程安全的。上面的实现是基于这个原理实现的,当有多个线程读或者多个线程写的时候,不保证数据的正确性。
所以使用的时候,一个线程写,一个线程读。网络应用中比较常用,就是开一个线程接口数据,然后把数据写入队列。然后开一个调度线程读取网络数据,然后分发到处理线程。
2.数据长度默认宏定义了一个长度,超过这个长度的时候,后续的数据会写入失败。
本文参考文章:
http://blog.csdn.net/mergerly/article/details/39009473
http://www.cnblogs.com/Anker/p/3481373.html
使用无锁队列(环形缓冲区)注意事项