首页 > 代码库 > 多个生产者——多个消费者模型(互斥量条件变量实现)
多个生产者——多个消费者模型(互斥量条件变量实现)
1. 介绍
生产者消费者问题属于有界缓冲区问题。我们现在讲述多个生产者向一个缓冲区中存入数据,多个生产者从缓冲区中取数据。
共享缓冲区作为一个环绕缓冲区,存数据到头时再从头开始。
2. 实现
我们使用一个互斥量保护生产者向缓冲区中存入数据。
由于有多个生产者,因此需要记住现在向缓冲区中存入的位置。
使用一个互斥量保护缓冲区中消息的数目,这个生产的数据数目作为生产者和消费者沟通的桥梁。
使用一个条件变量用于唤醒消费者。由于有多个消费者,同样消费者也需要记住每次取的位置。
4.代码
在选项中选择生产条目的数目,生产者的线程数目,消费者的线程数目。
生产者将条目数目循环放入缓冲区中,消费者从缓冲区中循环取出并在屏幕上打印出来。
#include "unp.h"/* * 多个生产者——多个消费者 * 使用条件变量和互斥锁的演示 */static const int NBUFF = 10000;static const int MAXNTHREADS = 100;//总共生产的条目数static int nitems;//生产者向其中放数据,消费者从中取数据static int buff[NBUFF]; //生产者使用的结构//向其中互斥的放数据static struct put { pthread_mutex_t mutex; int nput; //net position to put int nval; //next value to store} put = { PTHREAD_MUTEX_INITIALIZER};//记录缓冲区的状态//准备好的数目//消费者唯一关注的结构//当然生产者也会使用static struct nready { pthread_mutex_t mutex; pthread_cond_t cond; int nget; int nready; //number ready for consumer} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};void *produce(void*);void *consume(void*);int main(int argc, char **argv){ if (argc != 4) { err_quit("Usage: a.out <#items> <#produce_nthreads> <#consume_nthreads>"); } nitems = atoi(argv[1]); int produce_nthreads = min(atoi(argv[2]), MAXNTHREADS); int consume_nthreads = min(atoi(argv[3]), MAXNTHREADS); //Solaris 2.6需要设置线程并发数// Set_concurrency(nthreads + 1); pthread_t tid_produce[MAXNTHREADS]; for (int i = 0; i < produce_nthreads; ++i) { Pthread_create(&tid_produce[i], NULL, produce, NULL); } pthread_t tid_consume[MAXNTHREADS]; for (int i = 0; i < consume_nthreads; ++i) { Pthread_create(&tid_consume[i], NULL, consume, NULL); } //等待线程终止 for (int i = 0; i < produce_nthreads; ++i) { Pthread_join(tid_produce[i], NULL); } for (int i = 0; i < consume_nthreads; ++i) { Pthread_join(tid_consume[i], NULL); } exit(0);}void *produce(void *arg){ printf("producd\n"); //多个生产者 for ( ; ; ) { Pthread_mutex_lock(&put.mutex); //已存了需要多的数 if (put.nval >= nitems) { Pthread_mutex_unlock(&put.mutex); return NULL; } buff[put.nput] = put.nval; if (++put.nput >= NBUFF) { put.nput = 0; } ++put.nval; Pthread_mutex_unlock(&put.mutex); //当生产了数据后通知条件变量 //应该使临界区尽量短,宁愿使用多个互斥量 Pthread_mutex_lock(&nready.mutex); if (nready.nready == 0) { Pthread_cond_signal(&nready.cond); } ++nready.nready; Pthread_mutex_unlock(&nready.mutex); } //end for(;;) return NULL;}void *consume(void *argv){ printf("consume\n"); //多个消费者 //只生产nitems个选项 for ( ; ; ) { Pthread_mutex_lock(&nready.mutex); //while避免虚假唤醒 while (nready.nready == 0) { Pthread_cond_wait(&nready.cond, &nready.mutex); } //int ival = buff[nready.nget]; //if (++nready.nget == NBUFF) { // nready.nget = 0; //} if (++nready.nget >= nitems) { //nget比较的取值为1..nitems //当为nitems时少操作了一次, //总共操作nitems次 if (nready.nget == nitems) { printf("buff[%d] = %d\n", nready.nget - 1, buff[(nready.nget - 1) % NBUFF]); } Pthread_cond_signal(&nready.cond); Pthread_mutex_unlock(&nready.mutex); return NULL; } --nready.nready; Pthread_mutex_unlock(&nready.mutex); //仅仅读数据不许要互斥// if (buff[nready.nget - 1] != nready.nget - 1) { printf("buff[%d] = %d\n", nready.nget - 1, buff[(nready.nget - 1) % NBUFF]); //printf("buff[%d] = %d\n", nready.nget, buff[nready.nget]);// } } //end for(i:0..nitems) return NULL;}
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。