首页 > 代码库 > Linux多线程模拟生产者/消费者问题

Linux多线程模拟生产者/消费者问题

描述:

        生产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的N个线程生产者线程生产物品,然后将物品放置在一个空缓冲区中供N个消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。

 

输入:

        生产者个数、消费者个数、还是缓冲区大小、每个生产者生产产品的个数等。


输出:

       生产者-消费者并发执行的过程。消费者消费完所有的产品结束。

 

代码实现:

//main.cpp
#include "Storage.h"	//代码如下

#include <signal.h>
#include <pthread.h>
#include <unistd.h>

#include <string.h>
#include <stdio.h>
#include <stdio.h>

#include <iostream>
using namespace std;

const int sleepTime = 1;

struct passStruct
{
    pthread_mutex_t *m_mutex;   //所有线程所共享的互斥量
    int *m_products;            //生产者一次生产的产品数
    Storage *m_storage;         //共享的存储区
    int *m_nThreadNum;          //线程标号
};

pthread_cond_t notempty;
pthread_cond_t notfull;

void *producer(void *arg)
{
    //获取从控制线程中传来的数值
    passStruct tmp = *static_cast<passStruct *>(arg);

    //生产者每次生产的产品数
    int *products = tmp.m_products;
    //控制线程中已经初始化了的互斥量
    pthread_mutex_t *mutex = tmp.m_mutex;
    //获取仓库
    Storage *storage = tmp.m_storage;

    char strToStore[128],strCurrentStored[128];
    sprintf(strToStore,"\tAdd %d product to storage...\n",*products);

    while (true)
    {
        //获取互斥量
        pthread_mutex_lock(mutex);
        //如果当前仓库已满(已经没有空间可以填充),则打印阻塞消息
        if (!storage -> isHaveSpace())
        {
            cout << "++ producer " << *(tmp.m_nThreadNum) << " Block\n" << endl;
            //等待条件变量,并且释放互斥量
            pthread_cond_wait(?full,mutex);
        }

        //仓库中已经有空间了,先睡一会再进行生产
        sleep(sleepTime);

        //打印激活消息
        cout << "++ producer " << *(tmp.m_nThreadNum) << " Activate" << endl;
        //开始生产产品
        write(STDOUT_FILENO,strToStore,strlen(strToStore));
        storage -> addToStorage(*products);

        //打印当前仓库总的产品数
        sprintf(strCurrentStored,"\t\tcurrent storage %d products\n\n",
                storage -> currentCount());
        write(STDOUT_FILENO,strCurrentStored,strlen(strCurrentStored));

        //通知等待信号量notempty的第一个消费者线程,当前仓库非空
        pthread_cond_signal(?empty);
        pthread_mutex_unlock(mutex);

        //睡一会儿参与下一轮竞争
        sleep(sleepTime);
    }

    pthread_exit(NULL);
}

void *consumer(void *arg)
{
    passStruct tmp = *static_cast<passStruct *>(arg);
    pthread_mutex_t *mutex = tmp.m_mutex;
    Storage *storage = tmp.m_storage;

    string strToDisplay("\tGet a Product from storage...\n");
    char strCurrentStored[128];

    while (true)
    {
        //获取互斥量
        pthread_mutex_lock(mutex);
        //当前仓库中已空(仓库中没有产品),则打印阻塞消息,并等待条件变量的到来
        if (!storage -> isHaveProduct())
        {
            cout << "-- consumer " << *(tmp.m_nThreadNum) << " Block\n" << endl;
            pthread_cond_wait(?empty,mutex);
        }

        //当前仓库中已经有产品了^^,先睡一会在进行消费
        sleep(sleepTime);

        //打印激活消息
        cout << "-- consumer " << *(tmp.m_nThreadNum) << " Activate" << endl;
        write(STDOUT_FILENO,strToDisplay.c_str(),strToDisplay.size());
        storage -> getFromStorage();

        //打印当前仓库中的产品数
        sprintf(strCurrentStored,"\t\tcurrent storage %d products\n\n",
                storage -> currentCount());
        write(STDOUT_FILENO,strCurrentStored,strlen(strCurrentStored));

        //告知第一个阻塞在notfull条件变量的生产者线程,当前仓库已经有空间了
        pthread_cond_signal(?full);
        pthread_mutex_unlock(mutex);
        //先睡一会再参与竞争
        sleep(sleepTime);
    }

    pthread_exit(NULL);
}

//信号捕捉函数
void onSignal(int signalNumber)
{
    switch (signalNumber)
    {
        //如果捕捉到SIGUSR1,则整个程序退出,SIGUSR1由stop.sh程序产生
    case SIGUSR1:
        cout << "Main Program Ending..." << endl;
        _exit(0);
        break;
    case SIGINT:
        cout << "Can‘t Kill The Program with Ctrl+C, Please Use the Shell Script stop.sh!" << endl;
        sleep(5);
        break;
    default:
        break;
    }
}

int main()
{
    //注册信号
    signal(SIGUSR1,onSignal);
    signal(SIGINT,onSignal);

    //freopen("back.txt","w",stdout);

    //初始化互斥量以及条件变量
    pthread_mutex_t *mutex = new pthread_mutex_t;
    pthread_mutex_init(mutex,NULL);

    pthread_cond_init(?empty,NULL);
    pthread_cond_init(?full,NULL);

    int numberOfProducer;
    cout << "Please input the number of Producer: ";
    cin >> numberOfProducer;

    int numberOfConsumer;
    cout << "Please input the number of Consumer: ";
    cin >> numberOfConsumer;

    int numberOfProducts;
    cout << "Please input the number of Products for ONE PRODUCER: ";
    cin >> numberOfProducts;

    int sizeOfStorage;
    cout << "Please input the size of the Storage: ";
    cin >> sizeOfStorage;
    Storage storage(sizeOfStorage);

    //初始化所传递的值
    passStruct passValue;
    passValue.m_mutex = mutex;
    passValue.m_products = &numberOfProducts;
    passValue.m_storage = &storage;

    pthread_t pthreadProducer,pthreadConsumer;
    for (int i = 0; i != numberOfConsumer; ++i)
    {
        passValue.m_nThreadNum = new int(i+1);
        pthread_create(&pthreadConsumer,NULL,consumer,
                       static_cast<void *>(&passValue));
    }

    for (int i = 0; i != numberOfProducer; ++i)
    {
        passValue.m_nThreadNum = new int(i+1);
        pthread_create(&pthreadProducer,NULL,producer,
                       static_cast<void *>(&passValue));
    }

    //等待线程结束
    pthread_join(pthreadProducer,NULL);
    pthread_join(pthreadConsumer,NULL);

    pthread_mutex_destroy(mutex);
    delete mutex;
    pthread_cond_destroy(?empty);
    pthread_cond_destroy(?full);

    return 0;
}

//Storage.h
#ifndef STORAGE_H_INCLUDED
#define STORAGE_H_INCLUDED

class Storage
{
public:
    Storage(int);
    ~Storage();

    int currentCount()
    {
        return hasBeenStored;
    }

    bool isHaveSpace()
    {
        if (hasBeenStored < bufferSize)
        {
            return true;
        }
        return false;
    }
    bool isHaveProduct()
    {
        if (hasBeenStored != 0)
        {
            return true;
        }
        return false;
    }
    bool isEmpty()
    {
        return hasBeenStored == 0;
    }

    void addToStorage(int n);
    void getFromStorage();


private:
    int bufferSize;
    int hasBeenStored;
};

#endif // STORAGE_H_INCLUDED

//Storage.cpp
#include "Storage.h"

Storage::Storage(int n = 0):bufferSize(n),hasBeenStored(0)
{

}

Storage::~Storage()
{

}

void Storage::addToStorage(int n)
{
    if (hasBeenStored + n > bufferSize)
        hasBeenStored = bufferSize;
    else
        hasBeenStored += n;
}

void Storage::getFromStorage()
{
    if (!isEmpty())
        -- hasBeenStored;
    else
        hasBeenStored = 0;
}

附(Makefile,启动脚本等):

//Makefile
CC = g++
CPPFLAGS = -Wall -g -pthread

SOURCES = $(wildcard *.cpp)
OBJECTS = $(SOURCES:.cpp=.o)
BIN = main 

.PHONY: all clean

all: $(BIN)
$(BIN): $(OBJECTS)
	$(CC) $(CPPFLAGS) -o $@ $^
	@echo "# # # # # # OK! # # # # # "

%.o: %.cpp
	$(CC) $(CPPFLAGS) -c $^ -o $@

clean:
	-rm -rf $(BIN) $(OBJECTS) *.cbp *.layout

main程序的启动脚本

#!/bin/bash
# A Shell Script for Start the main program

BIN=main
ISEXIST=$(/bin/ls | /bin/grep main$)

#if this program not exits, make it.
if [ "$ISEXIST" = "" ] ; then
	/usr/bin/make
fi

#if this program not running, start it.
PID=$(/usr/bin/pgrep $BIN)

if [ "$PID" = "" ] ; then
	./main
fi

main程序的终止脚本

#!/bin/bash
# A Shell Script for Stop the main Program

PID=$(/usr/bin/pgrep main)
if [ "$PID" != "" ] ; then
	/bin/kill -USR1 $PID	#给main程序发送SIGUSR1信号
fi




Linux多线程模拟生产者/消费者问题