首页 > 代码库 > ACE_Message_Queue介绍(生产者/消费者)

ACE_Message_Queue介绍(生产者/消费者)

下面的两个线程共享一个消息队列,一个用来放整数到队列,一个从队列里取消息出来。此程序在控制台不停的输出递增数字,主要是内存不会泄露

用到了多线程、ACE_Message_Queue、ACE_Message_Block、ACE_Thread_Manager::instance()->spawn等

#include <iostream>
using namespace std;
#include "boost/lexical_cast.hpp"
using namespace boost;
#include "ace/Thread_Manager.h" 
#include "ace/Message_Queue.h"

void* create_vairous_record(void* ace_message_queue);

void* get_vairous_record(void* ace_message_queue);

int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) 
{

	ACE_Message_Queue<ACE_MT_SYNCH>* various_record_queue = new ACE_Message_Queue<ACE_MT_SYNCH>;

	ACE_Thread_Manager::instance()->spawn(
		ACE_THR_FUNC(create_vairous_record), 
		various_record_queue, 
		THR_NEW_LWP | THR_DETACHED);

	ACE_Thread_Manager::instance()->spawn(
		ACE_THR_FUNC(get_vairous_record), 
		various_record_queue, 
		THR_NEW_LWP | THR_DETACHED);

	ACE_Thread_Manager::instance()->wait();

	return 0;
}

void* create_vairous_record(void* ace_message_queue)
{

	ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
	int i=0;
	while (i<10000000)
	{
		ACE_Message_Block* mbl = new ACE_Message_Block(10);//在这里创建消息
		string temp = lexical_cast<string>(++i);
		mbl->copy(temp.c_str());
		p_queue->enqueue_tail(mbl);//消息被放到队列中(用指针引用消息实体)
	}
	return nullptr;
}

void* get_vairous_record(void* ace_message_queue)
{

	ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
	while (true)
	{
		ACE_Message_Block* mbl =nullptr;
		p_queue->dequeue_head(mbl);//消息出队,出队的消息应该在用完之后被释放
		if (mbl)
		{
			cout<<mbl->rd_ptr()<<endl;
			mbl->release();//消息已经用完,释放消息
		}
	}
	return nullptr;

}