首页 > 代码库 > 5 ACE acceptor connector Proactor异步框架

5 ACE acceptor connector Proactor异步框架

技术分享

技术分享

技术分享

技术分享




ACE_Acceptor_Connector框架 完成accpet操作


chunli@Linux:~/ace/AceAcceptorConnector$ cat echo_server.cpp 
#include <ace/Svc_Handler.h>
#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/Acceptor.h>

class AcceptorHandler: public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> {
public:
	typedef ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> Parent;
	enum {
		BUF_SIME = 512
	};

	virtual int handle_input(ACE_HANDLE h) 
	{
		//ACE_OS::sleep(5);
		ssize_t n = peer().recv(buf, BUF_SIME);
		if (n <= 0)
			ACE_ERROR_RETURN((LM_ERROR, "%p\n", "peer().recv()"), -1);
		if (peer().send(buf, n) == -1)	
			ACE_ERROR_RETURN((LM_ERROR, "%p\n", "peer().send()"), -1);
		return 0;
	}
private:
		char buf[BUF_SIME];
};

typedef ACE_Acceptor<AcceptorHandler, ACE_SOCK_Acceptor> MyAcceptor;

int main() {
	ACE_INET_Addr addr(8868);
	MyAcceptor acceptor(addr, ACE_Reactor::instance());
	ACE_Reactor::instance()->run_reactor_event_loop();
}
chunli@Linux:~/ace/AceAcceptorConnector$ 

编译运行:
chunli@Linux:~/ace/AceAcceptorConnector$ g++ echo_server.cpp -lACE -lpthread && ./a.out 


chunli@Linux:~/ace/AceTask$ nc localhost 8868
121
121

chunli@Linux:~$  nc localhost 8868
3213
3213
32131
32131




ACE_Acceptor_Connector框架 完成connector操作

chunli@Linux:~/ace/AceAcceptorConnector$ cat echo_client.cpp 
#include <iostream>
#include <ace/Svc_Handler.h>
#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Connector.h>
#include <ace/Connector.h>

class InputHandler: public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> 
{
public:
	typedef ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> Parent;
	enum {
		BUF_SIME = 512
	};
	virtual int open(void* a) 
	{
		if (Parent::open(a) == -1)
			return -1;
		return this->activate(THR_NEW_LWP | THR_DETACHED);
	}
	virtual int handle_input(ACE_HANDLE) //输出收到的数据
	{
		ssize_t n = peer().recv(buf, BUF_SIME);
		if (n <= 0)
			ACE_ERROR_RETURN((LM_ERROR, "%p\n","peer().recv()"), -1);
		buf[n] = 0;
		ACE_DEBUG((LM_DEBUG, "%s\n", buf));
		return 0;
	}
	virtual int svc() //从键盘读取数据
	{
		char inBuf[BUF_SIME] = "";
		while (std::cin.getline(inBuf, BUF_SIME)) 
		{
			if (peer().send(inBuf, strlen(inBuf)) == -1) 
			{
				ACE_ERROR((LM_ERROR, "%p\n", "peer().send()"));
				break;
			}
		}
		return 0;
	}

private:
		char buf[BUF_SIME];
};

typedef ACE_Connector<InputHandler, ACE_SOCK_Connector> MyConnector;

int main() 
{
	ACE_INET_Addr addr(8868, "127.0.0.1");
	MyConnector connector;

	InputHandler* p = 0;//由connector 创建 InputHandler
	if (connector.connect(p, addr) == -1)
		ACE_ERROR_RETURN((LM_ERROR, "%p\n", "connect()"), -1);

	ACE_Reactor::instance()->run_reactor_event_loop();
}
chunli@Linux:~/ace/AceAcceptorConnector$ 


编译运行:
chunli@Linux:~/ace/AceAcceptorConnector$ g++ echo_client.cpp -lACE -lpthread && ./a.out 
1234567890
qazwsx

中国上海!


Hello ACE 

^C
chunli@Linux:~/ace/AceAcceptorConnector$ 


模拟一个服务器:
chunli@Linux:~/ace$ nc -l localhost 8868
1234567890qazwsx
中国上海!
Hello ACE 
chunli@Linux:~/ace$



proactor 异步echo server

chunli@Linux:~/ace/AceProactor$ cat proactor_echo_server.cpp 
#include "ace/Asynch_IO.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/Proactor.h"

class EchoService: public ACE_Service_Handler 
{
public:
	~EchoService() 
	{
		if (this->handle() != ACE_INVALID_HANDLE)
			ACE_OS::closesocket(this->handle());
	}
	virtual void open(ACE_HANDLE h, ACE_Message_Block&) 
	{
		handle(h);
		if (this->reader_.open(*this) != 0 || this->writer_.open(*this) != 0) 
		{
			ACE_ERROR((LM_ERROR, "%p\n", "open()"));
			delete this;
			return;
		}

		ACE_Message_Block* mb;
		ACE_NEW_NORETURN(mb, ACE_Message_Block(512));
		if (this->reader_.read(*mb, mb->space()) != 0) 
		{
			ACE_ERROR((LM_ERROR, "%p\n", "read()"));
			mb->release();
			delete this;
			return;
		}
	}
	virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result& result) 
	{
		ACE_Message_Block &mb = result.message_block();
		if (!result.success() || result.bytes_transferred() == 0) 
		{
			mb.release();
			delete this;
		} 
		else 
		{
			if (this->writer_.write(mb, mb.length()) != 0) 
			{
				ACE_ERROR((LM_ERROR, "%p\n", "write()"));
				mb.release();
			} 
			else 
			{
				ACE_Message_Block* mblk;
				ACE_NEW_NORETURN(mblk, ACE_Message_Block(512));
				this->reader_.read(*mblk, mblk->space());
			}
		}
	}
	virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result& result) 
	{
		result.message_block().release();
	}

private:
	ACE_Asynch_Read_Stream reader_;
	ACE_Asynch_Write_Stream writer_;
};

int main() 
{
	ACE_INET_Addr listen_addr(8868);
	ACE_Asynch_Acceptor<EchoService> aio_acceptor;
	if (0 != aio_acceptor.open(listen_addr, 
				0, // bytes_to_read
				0, // pass_addresses
				ACE_DEFAULT_BACKLOG, 1, // reuse_addr
				0, // proactor
				0)) // validate_new_connection
		ACE_ERROR_RETURN((LM_ERROR, "%p\n", "write()"), 1);

	ACE_Proactor::instance()->proactor_run_event_loop();
}
chunli@Linux:~/ace/AceProactor$ 


编译运行:
chunli@Linux:~/ace/AceProactor$ g++ proactor_echo_server.cpp -lACE && ./a.out 
(2710 | 140613802981248) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=1024
^C
chunli@Linux:~/ace/AceProactor$ 



客户端测试:
chunli@Linux:~/ace$ nc  localhost 8868
123
123
34567
34567
^C
chunli@Linux:~/ace$






本文出自 “魂斗罗” 博客,请务必保留此出处http://990487026.blog.51cto.com/10133282/1889998

5 ACE acceptor connector Proactor异步框架