首页 > 代码库 > Boost.Asio c++ 网络编程翻译(24)

Boost.Asio c++ 网络编程翻译(24)

异步服务端中的多线程
我在第4章 客户端和服务端展示的异步服务端是单线程的,所有的事情都发生在main()中:
int main() {
       talk_to_client::ptr client = talk_to_client::new_();
       acc.async_accept(client->sock(), boost::bind(handle_
accept,client,_1));
    service.run();

异步的美妙之处就在于把单线程变为多线程的简单。你可以一直保持多线程知道你的并发客户端超过200。然后,你可以使用如下的代码片段把单线程变成100个线程:
boost::thread_group threads;
   void listen_thread() {
       service.run();
   }
   void start_listen(int thread_count) {
       for ( int i = 0; i < thread_count; ++i)
           threads.create_thread( listen_thread);
   }
   int main(int argc, char* argv[]) {
       talk_to_client::ptr client = talk_to_client::new_();
       acc.async_accept(client->sock(), boost::bind(handle_
   accept,client,_1));
       start_listen(100);
       threads.join_all();
   }
当然,一旦你选择了多线程,你需要考虑线程安全。尽管你在线程A中调用了async_*,它的完成流程可以在线程B中被调用(因为线程B也调用了service.run())。对于它本身而言这不是问题。只要你遵循逻辑流程,也就是从async_read()到on_read(),从on_read()到process_request,从process_request到async_write(),从async_write()到on_write(),从on_write()到async_read(),然后在你的talk_to_client类中没有被调用的公有方法,尽管不同的方法可以在不同的线程中被调用,他们还是会被有序地调用。从而不需要互斥量。
这也意味着对于一个客户端,只会有一个异步操作在等待。假如在有的情况,一个客户端我们有两个异步方法在等待,你就需要互斥量了。这是因为两个等待的操作可能正好在同一个时间完成,然后我们就会在两个不同的线程中间同时调用他们的完成处理函数。所以,这里需要线程安全,也就是需要使用互斥量。
在我们的异步服务端中,我们确实同时有两个等待的操作:
void do_read() {
       async_read(sock_, buffer(read_buffer_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
       post_check_ping();
   }
   void post_check_ping() {
       timer_.expires_from_now(boost::posix_time::millisec(5000));
       timer_.async_wait( MEM_FN(on_check_ping));
   }
当在做一个read操作时,我们会异步等待read操作完成和超时。所以,这里需要线程安全。
我的建议是,如果你准备使用多线程,从开始就保证你的类是线程安全的。通常这不会影响它的性能(当然你也可以在配置中设置开关)。同时,如果你准备使用多线程,从一个开始就使用。这样的话你能尽早地发现可能存在的问题。一旦你发现一个问题,你首先需要检查的事情就是:单线程运行的时候是否会发生?如果是,它很简单;只要调试它就可以了。否则,你可能忘了对一些方法加锁(互斥量)。
因为我们的例子需要是线程安全的,我已经把talk_to_client修改成使用互斥量的了。同时,我们也有一个客户端连接的列表,它也需要自己的互斥量因为我们有时需要访问它。
避免死锁和内存冲突不是很简单。下面是我需要对update_client_changed()方法进行修改的地方:
void update_clients_changed() {
       array copy;
       { boost::recursive_mutex::scoped_lock lk(clients_cs);
         copy = clients; }
       for( array::iterator b = copy.begin(), e = copy.end(); b != e;
   ++b)
           (*b)->set_clients_changed();

你需要避免的是同时有两个互斥量被锁定(这会导致死锁)。在我们的例子中,我们不想clients_cs和一个客户端的cs_互斥量同时被锁住

异步操作
Boost.Asio同样允许你异步地运行你任何一个方法。仅仅需要使用下面的代码片段:
void my_func() {
      ...
   }
   service.post(my_func);
这样就可以保证my_func在调用了service.run()的一个线程中间被调用。你同样可以异步地调用一个有完成处理handler的方法,方法的handler会在方法结束的时候通知你。伪代码如下:
void on_complete() {
      ...
   }
   void my_func() {

...

       service.post(on_complete);
   }

async_call(my_func);

这里没有async_call方法,因此,你需要自己创建。幸运的是,它不是很复杂,参考下面的代码片段:
struct async_op : boost::enable_shared_from_this<async_op>, ... {
       typedef boost::function<void(boost::system::error_code)>
   completion_func;
       typedef boost::function<boost::system::error_code ()> op_func;
       struct operation { ... };
       void start() {
           { boost::recursive_mutex::scoped_lock lk(cs_);
             if ( started_) return; started_ = true; }
           boost::thread t( boost::bind(&async_op::run,this));
       }
       void add(op_func op, completion_func completion, io_service
   &service) {
           self_ = shared_from_this();
           boost::recursive_mutex::scoped_lock lk(cs_);
           ops_.push_back( operation(service, op, completion));
           if ( !started_) start();

void stop() {

        boost::recursive_mutex::scoped_lock lk(cs_);
        started_ = false; ops_.clear();

} private:

    boost::recursive_mutex cs_;
    std::vector<operation> ops_; bool started_; ptr self_;
};
async_op方法创建了一个后台线程,这个线程会运行(run())你添加(add())到它里面的所有的异步操作。为了让事情简单一些,每个操作都包含下面的内容:
  • 一个异步调用的方法
  • 当第一个方法结束时被调用的一个完成处理handler
  • 会运行完成处理handler的io_service实例。这也是完成时通知你的地方。参考下面的代码:
  1. struct async_op : boost::enable_shared_from_this<async_op>
                           , private boost::noncopyable {
    
               struct operation {
                   operation(io_service & service, op_func op, completion_
    
           func completion)
                       : service(&service), op(op), completion(completion)
                       , work(new io_service::work(service))
    
                   {}
                   operation() : service(0) {}
                   io_service * service;
                   op_func op;
                   completion_func completion;
                   typedef boost::shared_ptr<io_service::work> work_ptr;
                   work_ptr work;
    

};

... }; 

它们被operation结构体包含在内部。注意当有一个操作在等待时,我们在操作的构造方法中构造一个io_service::work实例,从而保证直到我们完成我们的异步调用之前service.run()都不会结束(当io_service::work实例保持活动时,service.run()就会认为它有工作需要做)。参考下面的代码片段:
struct async_op : ... {
       typedef boost::shared_ptr<async_op> ptr;
       static ptr new_() { return ptr(new async_op); }
       ...
       void run() {
           while ( true) {
               { boost::recursive_mutex::scoped_lock lk(cs_);
                 if ( !started_) break; }
               boost::this_thread::sleep( boost::posix_
   time::millisec(10));
               operation cur;

));
}

{ boost::recursive_mutex::scoped_lock lk(cs_);
  if ( !ops_.empty()) {
      cur = ops_[0]; ops_.erase( ops_.begin());
  }}
if ( cur.service)
    cur.service->post(boost::bind(cur.completion, cur.op()
           self_.reset();
       }

}; 

run()方法就是后台线程;它仅仅观察是否有工作需要做,如果有,就一个一个地运行这些异步方法。在每个调用结束的时候,它会调用相关的完成处理方法。
为了测试,我们创建一个会被异步执行的compute_file-checksum方法
size_t checksum = 0;
   boost::system::error_code compute_file_checksum(std::string file_name)
   {
HANDLE file = ::CreateFile(file_name.c_str(), GENERIC_READ, 0, 0,
           OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
       windows::random_access_handle h(service, file);
       long buff[1024];
       checksum = 0;
       size_t bytes = 0, at = 0;
       boost::system::error_code ec;
       while ( (bytes = read_at(h, at, buffer(buff), ec)) > 0) {
           at += bytes; bytes /= sizeof(long);
           for ( size_t i = 0; i < bytes; ++i)
               checksum += buff[i];

}

       return boost::system::error_code(0, boost::system::generic_
   category());
   }
   void on_checksum(std::string file_name, boost::system::error_code) {
       std::cout << "checksum for " << file_name << "=" << checksum <<
   std::endl;
   }
   int main(int argc, char* argv[]) {
       std::string fn = "readme.txt";
       async_op::new_()->add( service, boost::bind(compute_file_
   checksum,fn),
                                       boost::bind(on_checksum,fn,_1));
       service.run();
   }
注意我展示给你的只是实现异步调用一个方法的一种可能。除了像我这样实现一个后台线程,你可以可是使用一个内部io_service实例,然后推送异步方法给这个实例调用。这个作为一个练习留个读者。
你也可以扩展这个类让其可以展示一个异步操作的进度(比如,使用百分比)。在这种情况下,你可以在主线程通过一个进度条来显示进度。





Boost.Asio c++ 网络编程翻译(24)