首页 > 代码库 > 使用同步socket创建非阻塞socket server

使用同步socket创建非阻塞socket server

 

 

这个socket server可以:

  1. 非阻塞的处理多个socket连接。
  2. 可以接收来自客户端的ping消息,并把5秒内无活动的客户端移除。
  3. 可以接收客户端的login请求,使用者可以按自己需求加入认证逻辑。

 

/*‘‘‘Non-Blocking socket server using blocking APICreated on Dec 25, 2014 (merry christmas)@author: ScottGu<150316990@qq.com, gu.kai.66@gmail.com>performance tested:environment: 64bit win7, i7-4800MQ, 8GB‘‘‘*/#pragma onceboost::recursive_mutex cs; // thread-safe access to clients arraystruct talk_to_client    : boost::enable_shared_from_this<talk_to_client>{    talk_to_client(boost::asio::io_service& io_service)         :sock_(io_service)    {    }    std::string username() const    {        return username_;    }    void answer_to_client() {        try {            read_request();            process_request();        }        catch (boost::system::system_error&) {            stop();        }        if (timed_out())            stop();    }    void read_request() {        if (sock_.available())        {            already_read_ += sock_.read_some(                boost::asio::buffer(buff_ + already_read_, max_msg - already_read_));        }    }    void process_request() {        bool found_enter = std::find(buff_, buff_ + already_read_, \n)< buff_ + already_read_;        if (!found_enter)            return; // message is not full        // process the msg        last_ping_ = boost::posix_time::microsec_clock::local_time();        size_t pos = std::find(buff_, buff_ + already_read_, \n) -            buff_;        std::string msg(buff_, pos);                std::copy(buff_ + pos, buff_ + already_read_, buff_);        //std::copy(buff_ + already_read_, buff_ + max_msg, buff_);        already_read_ -= pos + 1;        if (msg.find("login ") == 0) on_login(msg);        else if (msg.find("ping") == 0) on_ping();        else if (msg.find("ask_clients") == 0) on_clients();        else std::cerr << "invalid msg " << msg << std::endl;    }    void set_clients_changed() { clients_changed_ = true; }    boost::asio::ip::tcp::socket & sock() { return sock_; }    bool timed_out() const {        boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();        long long ms = (now - last_ping_).total_milliseconds();        return ms > 5000;    }    void stop() {        boost::system::error_code err; sock_.close(err);    }    void on_login(const std::string & msg) {        std::istringstream in(msg);        in >> username_ >> username_;        write("login ok\n");        //update_clients_changed();    }    void on_ping() {        write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");        clients_changed_ = false;    }    void on_clients();    void write(const std::string & msg) { sock_.write_some(boost::asio::buffer(msg)); }private:    //   in Synchronous Client field are same    bool clients_changed_;    boost::posix_time::ptime last_ping_;    boost::asio::ip::tcp::socket sock_;    enum { max_msg = 6*1024 };    int already_read_;    char buff_[max_msg];    bool started_;    std::string username_;};typedef boost::shared_ptr<talk_to_client> client_ptr;typedef std::vector<client_ptr> array;array clients;void talk_to_client::on_clients() {    std::string msg;    boost::recursive_mutex::scoped_lock lk(cs);    for (auto b = clients.begin(), e = clients.end(); b != e; ++b){        msg += (*b)->username() + " ";    }    write("clients " + msg + "\n");}void accept_thread() {    boost::asio::io_service io_service;    boost::asio::ip::tcp::acceptor acceptr(io_service,        boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8001));    while (true) {        client_ptr new_(new talk_to_client(io_service));        acceptr.accept(new_->sock());                boost::recursive_mutex::scoped_lock lk(cs);        clients.push_back(new_);    }}void handle_clients_thread() {    while (true) {        boost::this_thread::sleep(boost::posix_time::millisec(1));        boost::recursive_mutex::scoped_lock lk(cs);        for (array::iterator b = clients.begin(), e = clients.end(); b != e; ++b){            (*b)->answer_to_client();        }                // erase clients that timed out        clients.erase(std::remove_if(clients.begin(), clients.end(),            boost::bind(&talk_to_client::timed_out, _1)), clients.end());    }}int run_sync_talk_server() {    boost::thread_group threads;    threads.create_thread(boost::bind(accept_thread));    threads.create_thread(handle_clients_thread);    threads.join_all();}

 

使用同步socket创建非阻塞socket server