首页 > 代码库 > 使用同步socket创建非阻塞socket server
使用同步socket创建非阻塞socket server
这个socket server可以:
- 非阻塞的处理多个socket连接。
- 可以接收来自客户端的ping消息,并把5秒内无活动的客户端移除。
- 可以接收客户端的login请求,使用者可以按自己需求加入认证逻辑。
/*‘‘‘Non-Blocking socket server using blocking APICreated on Dec 25, 2014 (merry christmas)@author: ScottGu<150316990@qq.com, gu.kai.66@gmail.com>performance tested:environment: 64bit win7, i7-4800MQ, 8GB‘‘‘*/#pragma onceboost::recursive_mutex cs; // thread-safe access to clients arraystruct talk_to_client : boost::enable_shared_from_this<talk_to_client>{ talk_to_client(boost::asio::io_service& io_service) :sock_(io_service) { } std::string username() const { return username_; } void answer_to_client() { try { read_request(); process_request(); } catch (boost::system::system_error&) { stop(); } if (timed_out()) stop(); } void read_request() { if (sock_.available()) { already_read_ += sock_.read_some( boost::asio::buffer(buff_ + already_read_, max_msg - already_read_)); } } void process_request() { bool found_enter = std::find(buff_, buff_ + already_read_, ‘\n‘)< buff_ + already_read_; if (!found_enter) return; // message is not full // process the msg last_ping_ = boost::posix_time::microsec_clock::local_time(); size_t pos = std::find(buff_, buff_ + already_read_, ‘\n‘) - buff_; std::string msg(buff_, pos); std::copy(buff_ + pos, buff_ + already_read_, buff_); //std::copy(buff_ + already_read_, buff_ + max_msg, buff_); already_read_ -= pos + 1; if (msg.find("login ") == 0) on_login(msg); else if (msg.find("ping") == 0) on_ping(); else if (msg.find("ask_clients") == 0) on_clients(); else std::cerr << "invalid msg " << msg << std::endl; } void set_clients_changed() { clients_changed_ = true; } boost::asio::ip::tcp::socket & sock() { return sock_; } bool timed_out() const { boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); long long ms = (now - last_ping_).total_milliseconds(); return ms > 5000; } void stop() { boost::system::error_code err; sock_.close(err); } void on_login(const std::string & msg) { std::istringstream in(msg); in >> username_ >> username_; write("login ok\n"); //update_clients_changed(); } void on_ping() { write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n"); clients_changed_ = false; } void on_clients(); void write(const std::string & msg) { sock_.write_some(boost::asio::buffer(msg)); }private: // in Synchronous Client field are same bool clients_changed_; boost::posix_time::ptime last_ping_; boost::asio::ip::tcp::socket sock_; enum { max_msg = 6*1024 }; int already_read_; char buff_[max_msg]; bool started_; std::string username_;};typedef boost::shared_ptr<talk_to_client> client_ptr;typedef std::vector<client_ptr> array;array clients;void talk_to_client::on_clients() { std::string msg; boost::recursive_mutex::scoped_lock lk(cs); for (auto b = clients.begin(), e = clients.end(); b != e; ++b){ msg += (*b)->username() + " "; } write("clients " + msg + "\n");}void accept_thread() { boost::asio::io_service io_service; boost::asio::ip::tcp::acceptor acceptr(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8001)); while (true) { client_ptr new_(new talk_to_client(io_service)); acceptr.accept(new_->sock()); boost::recursive_mutex::scoped_lock lk(cs); clients.push_back(new_); }}void handle_clients_thread() { while (true) { boost::this_thread::sleep(boost::posix_time::millisec(1)); boost::recursive_mutex::scoped_lock lk(cs); for (array::iterator b = clients.begin(), e = clients.end(); b != e; ++b){ (*b)->answer_to_client(); } // erase clients that timed out clients.erase(std::remove_if(clients.begin(), clients.end(), boost::bind(&talk_to_client::timed_out, _1)), clients.end()); }}int run_sync_talk_server() { boost::thread_group threads; threads.create_thread(boost::bind(accept_thread)); threads.create_thread(handle_clients_thread); threads.join_all();}
使用同步socket创建非阻塞socket server
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。