首页 > 代码库 > 探究一种定长队列操作(C语言版本)

探究一种定长队列操作(C语言版本)

一 问题来源:南京烽火通信面试

二 问题:

现有一种定长队列,长度为2的n次方,此队列可被多线程访问,但必须确保线程级安全,即在任意时刻,队列的长度保持不变。

三 笔者分析

1. 队列的存储结构--链式和顺序均可。

2. 长度为2的n次方--便于移动队头和队尾指针,假设队列长度为size,那么rear = (rear + 1) & (size - 1),&运算比算术求模运算效率高。为此,笔者使用了顺序队列。

3. 怎么确保线程级安全

笔者认为此系统一定同时存在多个读者和写者,读者是读取队头元素,写者是不断向队尾插入元素,读者与读者之间,读者与写者之间,写者与写者之间都必须互斥访问,因此,必须采用互斥锁,保证任意时刻队列长度不变。为此,笔者定义一个全局变量balance。

读者 get

加锁

检测balance为1吗?如果为1,表明队尾已经有一个元素加入, 那么读取队头,并将balance = 0

解锁

写者 put

加锁

检测balance为0吗?如果为0,表明队列平衡,没有元素插入, 那么在队列插入元素,并将balance = 1

解锁

这样能够确保同一时刻读、写者之间彼此是互斥的。

四 代码组织


五 代码详解

1. queue.h

/*************************************************************************
    > File Name: queue.h
    > Author: wangzhicheng
    > Mail: 2363702560@qq.com 
    > Created Time: Fri 14 Nov 2014 11:23:55 AM WST
 ************************************************************************/
#ifndef _QUEUE_H_
#define _QUEUE_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define MAX 1024    // the max storage space
typedef int DataType;   // data type
/*
 * sequenece queue
 * */
typedef struct Queue {
	DataType *array;
	int front;       // pointer to the header node of queue
	int rear;        // pointer to the next position of last node in the queue
	int size;        // initial size of the queue size should be 8 bytes
}Queue;
Queue Q;
int balance;   // should be 0, 1 -- a node add in the queue
pthread_mutex_t mutex;  // mutex lock
int InitQueue();
int reInitQueue(uint);
int get(DataType *);
int put(DataType);
#endif

2. queue.c

/*************************************************************************
    > File Name: queue.c
    > Author: wangzhicheng2013
    > Mail: 2363702560@qq.com 
    > Created Time: Fri 14 Nov 2014 03:01:32 PM WST
 ************************************************************************/
#include "queue.h"
/*
 * initialize the queue
 * return 0 if successfully 
 * else return -1
 * */
int InitQueue() {
	int rc = 0;
	int i;
	Q.array = (DataType *)malloc(sizeof(DataType) * MAX);
	if(!Q.array) {
		rc = -1;
		printf("Init Queue failed, Memeory lack...!\n");
	}
	else {  // initialize the queue
		Q.front = 0;
		Q.size = 8;
		Q.rear = Q.size;
		for(i = 0;i < Q.size;i++) Q.array[i] = i;  
		pthread_mutex_init(&mutex, NULL);
	}

	return rc;
}
/*
 * reinitialize the queue
 * return 0 if successfully 
 * else return -1
 * */
int reInitQueue(uint capacity) {
	int rc = 0;
	int i;
	if(capacity >= MAX) rc = -1;
	else {  // initialize the queue
		for(i = 2;i < capacity;i = i << 1) ;
		if(i > capacity) i = i / 2;  // compute the max 2 ^ n less than capacity
		Q.size = i;
		for(i = Q.rear;i < Q.size;i++) Q.array[i] = i;  
		Q.rear = Q.size;
	}

	return rc;
}
/*
 * get a element from the queue
 * return 0 if successfully 
 * else return -1
 * */
int get(DataType *e) {
	int rc = -1;
	pthread_mutex_lock(&mutex);
	if(balance) {   // if put a node in the queue before
		*e = Q.array[Q.front];
		Q.front = (Q.front + 1) & (Q.size - 1);  // circularly move, save much time
		balance = 0; // return orgin
		rc = 0;
	}
	pthread_mutex_unlock(&mutex);

	return rc;
}
/*
 * put a element in the queue
 * return 0 if successfully 
 * else return -1
 * */
int put(DataType e) {
	int rc = -1;
	pthread_mutex_lock(&mutex);
	if(!balance) {   // if the queue is balanced
		Q.array[Q.rear] = e;
		Q.rear = (Q.rear + 1) & (Q.rear - 1);  // circularly move, save much time
		balance = 1;
		rc = 0;
	}
	pthread_mutex_unlock(&mutex);

	return rc;
}

3. main.c

/*************************************************************************
    > File Name: main.c
    > Author: ma6174
    > Mail: ma6174@163.com 
    > Created Time: Fri 14 Nov 2014 03:26:48 PM WST
 ************************************************************************/

#include "queue.h"
#include <time.h>
#define THREAD_NUM 20
void *_read(void *arg) {
	DataType e;
	while(1) {
		if(get(&e)) {
			perror("read failed...!\n");
		}
		else printf("%d has been read...!\n", e);
		usleep(20);
//		sleep(1);
	}
}
void *_write(void *arg) {
	DataType e = rand() % 10;
	while(1) {
		if(put(e)) {
			perror("write failed...!\n");
		}
		else printf("%d has been write...!\n", e);
		usleep(20);
//		sleep(1);
	}
}
int main() {
	if(InitQueue()) {
		exit(EXIT_FAILURE);
	}
	srand((unsigned)time(NULL));
	pthread_t threads[THREAD_NUM];
	int i;
	for(i = 0;i < THREAD_NUM;i++) {
		if(i < THREAD_NUM / 2) {
			if(pthread_create(&threads[i], NULL, _read, NULL)) {
				perror("thread created failed...!\n");
			}
		}
		else {
			if(pthread_create(&threads[i], NULL, _write, NULL)) {
				perror("thread created failed...!\n");
			}
		}
	}
	for(i = 0;i < THREAD_NUM;i++) {
		pthread_join(threads[i], NULL);
	}

	return 0;
}


4. Makefile

CC=gcc
all:
	$(CC) -g -o main src/main.c src/queue.c header/queue.h -I src/ -I header/ -lpthread

六 运行测试



探究一种定长队列操作(C语言版本)