首页 > 代码库 > boost.asio源码剖析(三) ---- 流程分析

boost.asio源码剖析(三) ---- 流程分析

* 常见流程分析之一(Tcp异步连接)

 

我们用一个简单的demo分析Tcp异步连接的流程:

 1 #include <iostream> 2 #include <boost/asio.hpp> 3  4 // 异步连接回调函数 5 void on_connect(boost::system::error_code ec) 6 { 7     if (ec)  // 连接失败, 输出错误码 8         std::cout << "async connect error:" << ec.message() << std::endl; 9     else  // 连接成功10         std::cout << "async connect ok!" << std::endl;11 }12 13 int main()14 {15     boost::asio::io_service ios;  // 创建io_service对象16     boost::asio::ip::tcp::endpoint addr(17         boost::asio::ip::address::from_string("127.0.0.1"), 12345);  // server端地址18     boost::asio::ip::tcp::socket conn_socket(ios);  // 创建tcp协议的socket对象19     conn_socket.async_connect(addr, &on_connect);  // 发起异步连接请求20     ios.run();  // 调用io_service::run, 等待异步操作结果21 22     std::cin.get();23     return 0;24 }

 


这段代码中的异步连接请求在asio源码中的序列图如下:

 

      其中,basic_socket是个模板类,tcp协议中的socket的定义如下:
            typedef basic_socket<tcp> socket;


      reactor的定义如下:
      #if defined(BOOST_ASIO_WINDOWS_RUNTIME)
            typedef class null_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_IOCP)
            typedef class select_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_EPOLL)
            typedef class epoll_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_KQUEUE)
            typedef class kqueue_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_DEV_POLL)
            typedef class dev_poll_reactor reactor;
      #else
            typedef class select_reactor reactor;
      #endif

      在这个序列图中最值得注意的一点是:在windows平台下,异步连接请求不是由Iocp处理的,而是由select模型处理的,这是与异步读写数据最大的不同之处。

 


* 常见流程分析之二(Tcp异步接受连接)

 

我们用一个简单的demo分析Tcp异步连接的流程:

 1 #include <iostream> 2 #include <boost/asio.hpp> 3 #include <boost/bind.hpp> 4  5 // 异步连接回调函数 6 void on_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket * socket_ptr) 7 { 8     if (ec)  // 连接失败, 输出错误码 9         std::cout << "async accept error:" << ec.message() << std::endl;10     else  // 连接成功11         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;12 13     // 断开连接, 释放资源.14     socket_ptr->close(), delete socket_ptr;15 }16 17 int main()18 {19     boost::asio::io_service ios;  // 创建io_service对象20     boost::asio::ip::tcp::endpoint addr(21         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址22     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象23     boost::asio::ip::tcp::socket * socket_ptr = new boost::asio::ip::tcp::socket(ios);24     acceptor.async_accept(*socket_ptr25         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求26     ios.run();  // 调用io_service::run, 等待异步操作结果27 28     std::cin.get();29     return 0;30 }

 

这段代码中的异步连接请求在asio源码中的序列图如下:

 


* 常见流程分析之三(Tcp异步读写数据)

 

我们依然以上一节的例子为基础,扩展一个简单的demo分析Tcp异步读写数据的流程:

 1 #include <iostream> 2 #include <boost/asio.hpp> 3 #include <boost/bind.hpp> 4 #include <boost/shared_ptr.hpp> 5 #include <boost/array.hpp> 6  7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t; 8 typedef boost::array<char, 128> buffer_t; 9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;10 11 // 异步读数据回调函数12 void on_read(boost::system::error_code ec13     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)14 {15     if (ec)16         std::cout << "async write error:" << ec.message() << std::endl;17     else18     {19         std::cout << "async read size:" << len;20         std::cout << " info:" << std::string((char*)buffer_ptr->begin(), len) << std::endl;21 22         // auto release socket and buffer.23     }24 }25 26 // 异步写数据回调函数27 void on_write(boost::system::error_code ec28     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)29 {30     if (ec)31         std::cout << "async write error:" << ec.message() << std::endl;32     else33     {34         std::cout << "async write size:" << len << std::endl;35         socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())36             , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred37                 , socket_ptr, buffer_ptr));38     }39 }40 41 // 异步连接回调函数42 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)43 {44     if (ec)  // 连接失败, 输出错误码45     {46         std::cout << "async accept error:" << ec.message() << std::endl;47     }48     else  // 连接成功49     {50         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;51         buffer_ptr_t buffer_ptr(new buffer_t);52         strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");53         socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))54             , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred55                 , socket_ptr, buffer_ptr));56     }57 }58 59 int main()60 {61     boost::asio::io_service ios;  // 创建io_service对象62     boost::asio::ip::tcp::endpoint addr(63         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址64     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象65     socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));66     acceptor.async_accept(*socket_ptr67         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求68     ios.run();  // 调用io_service::run, 等待异步操作结果69 70     std::cout << "press enter key...";71     std::cin.get();72     return 0;73 }    

 

这段代码中的异步连接请求在asio源码中的序列图如下:

 


* 常见流程分析之四(Tcp强制关闭连接)

我们依然以上一节的例子为基础,扩展一个简单的demo分析Tcp强制关闭连接的流程:

 1 #include <iostream> 2 #include <boost/asio.hpp> 3 #include <boost/bind.hpp> 4 #include <boost/shared_ptr.hpp> 5 #include <boost/array.hpp> 6  7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t; 8 typedef boost::array<char, 128> buffer_t; 9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;10 11 // 异步读数据回调函数12 void on_read(boost::system::error_code ec13     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)14 {15     if (ec)  // 连接失败, 输出错误码16     {17         std::cout << "async read error:" << ec.message() << std::endl;18     }19 }20 21 // 异步写数据回调函数22 void on_write(boost::system::error_code ec23     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)24 {25     if (ec)  // 连接失败, 输出错误码26     {27         std::cout << "async write error:" << ec.message() << std::endl;28     }29 }30 31 // 异步连接回调函数32 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)33 {34     if (ec)  // 连接失败, 输出错误码35     {36         std::cout << "async accept error:" << ec.message() << std::endl;37     }38     else  // 连接成功39     {40         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;41 42         {43             buffer_ptr_t buffer_ptr(new buffer_t);44             strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");45             socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))46                 , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred47                 , socket_ptr, buffer_ptr));48         }49 50         {51             buffer_ptr_t buffer_ptr(new buffer_t);52             socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())53                 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred54                 , socket_ptr, buffer_ptr));55         }56 57         /// 强制关闭连接58         socket_ptr->close(ec);59         if (ec)60             std::cout << "close error:" << ec.message() << std::endl;61     }62 }63 64 int main()65 {66     boost::asio::io_service ios;  // 创建io_service对象67     boost::asio::ip::tcp::endpoint addr(68         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址69     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象70     socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));71     acceptor.async_accept(*socket_ptr72         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求73     socket_ptr.reset();74     ios.run();  // 调用io_service::run, 等待异步操作结果75 76     std::cout << "press enter key...";77     std::cin.get();78     return 0;79 } 

这个例子中,接受到客户端的连接后,立即发起异步读请求和异步写请求,然后立即强制关闭socket。

其中,强制关闭socket的请求在asio源码中的序列图如下:

 

* 常见流程分析之五(Tcp优雅地关闭连接)

我们依然以第三节的例子为基础,扩展一个简单的demo分析Tcp优雅地关闭连接的流程:

 1 #include <iostream> 2 #include <boost/asio.hpp> 3 #include <boost/bind.hpp> 4 #include <boost/shared_ptr.hpp> 5 #include <boost/array.hpp> 6  7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t; 8 typedef boost::array<char, 32> buffer_t; 9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;10 11 12 // 异步读数据回调函数13 void on_read(boost::system::error_code ec14     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)15 {16     static int si = 0;17     if (ec)  // 连接失败, 输出错误码18     {19         std::cout << "async read(" << si++ << ") error:" << ec.message() << std::endl;20         socket_ptr->shutdown(boost::asio::socket_base::shutdown_receive, ec);21         socket_ptr->close(ec);22         if (ec)23             std::cout << "close error:" << ec.message() << std::endl;24     }25     else26     {27         std::cout << "read(" << si++ << ") len:" << len << std::endl;28 29         socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())30             , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred31             , socket_ptr, buffer_ptr));32     }33 }34 35 // 异步写数据回调函数36 void on_write(boost::system::error_code ec37     , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)38 {39     if (ec)  // 连接失败, 输出错误码40     {41         std::cout << "async write error:" << ec.message() << std::endl;42     }43     else44     {45         /// 优雅地关闭连接46         socket_ptr->shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);47         if (ec)48             std::cout << "shutdown send error:" << ec.message() << std::endl;49     }50 }51 52 // 异步连接回调函数53 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)54 {55     if (ec)  // 连接失败, 输出错误码56     {57         std::cout << "async accept error:" << ec.message() << std::endl;58     }59     else  // 连接成功60     {61         std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;62 63         {64             buffer_ptr_t buffer_ptr(new buffer_t);65             socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())66                 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred67                 , socket_ptr, buffer_ptr));68         }69 70         {71             buffer_ptr_t buffer_ptr(new buffer_t);72             strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");73             socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))74                 , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred75                 , socket_ptr, buffer_ptr));76         }77     }78 }79 80 int main()81 { 82     boost::asio::io_service ios;  // 创建io_service对象83     boost::asio::ip::tcp::endpoint addr(84         boost::asio::ip::address::from_string("0.0.0.0"), 12345);  // server端地址85     boost::asio::ip::tcp::acceptor acceptor(ios, addr, false);  // 创建acceptor对象86     socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));87     acceptor.async_accept(*socket_ptr88         , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr));  // 调用异步accept请求89     socket_ptr.reset();90     ios.run();  // 调用io_service::run, 等待异步操作结果91 92     std::cout << "press enter key...";93     std::cin.get();94     return 0;95 }

 

      在这个例子中,接收到客户端的连接并向客户端发送数据以后,先关闭socket的发送通道,然后等待socket接收缓冲区中的数据全部read出来以后,再关闭socket的接收通道。此时,socket的接收和发送通道均以关闭,任何进程都无法使用此socket收发数据,但其所占用的系统资源并未释放,底层发送缓冲区中的数据也不保证已全部发出,需要在此之后执行close操作以便释放系统资源。
      若在释放系统资源前希望底层发送缓冲区中的数据依然可以发出,则需在socket的linger属性中设置一个等待时间,以便有时间等待发送缓冲区中的数据发送完毕。但linger中的值绝对不是越大越好,这是因为其原理是操作系统帮忙保留socket的资源以等待其发送缓冲区中的数据发送完毕,如果远端socket的一直未能接收数据便会导致本地socket一直等待下去,这对系统资源是极大的浪费。因此,在需要处理大量连接的服务端,linger的值一定不可过大。

 

 

由于本文会实时根据读者反馈的宝贵意见更新,为防其他读者看到过时的文章,因此本系列专题谢绝转载!