首页 > 代码库 > libjingle的线程机制

libjingle的线程机制

     调试发现Win32SocketServer::Pump被执行了多次,但是message_queue_->Dispatch(&msg);函数则被执行了一次,即响应VideoCapturer::OnMessage函数中响应MSG_STATE_CHANGE消息。VideoCapturer类的构造函数总共被执行了两次,一次代表的是本地端,在构造函数中将类成员变量talk_base::Thread* thread_;赋值为主线程;另一次代表的是远端,成员变量thread_;被赋值为PeerConnectionFactory类中创建的signaling_thread_,这是因为是在该线程的消息处理中完成的。具体就是conductor.cc文件的Conductor::OnMessageFromPeer函数(主线程运行)中,调用peer_connection_->SetRemoteDescription,而peer_connection_则是Conductor::InitializePeerConnection函数中调用PeerConnectionFactory::CreatePeerConnection函数创建的PeerConnectionProxy对象,该对象封装的线程就是PeerConnectionFactory::CreatePeerConnection_s函数中设定的signaling_thread。所以在主线程调用peer_connection_->SetRemoteDescription,由于代理的作用(具体看proxy.h文件),调用这个函数就是用signaling_thread发送了一个消息,并对该消息绑定了其实现,即代理类所关联的PeerConnection类的SetRemoteDescription方法,这样线程的消息响应最终会调用PeerConnection::SetRemoteDescription。

这个函数最终会第二次调用VideoCapturer的构造函数,所以第二次赋值给VideoCapturer的成员变量thread_就是PeerConnectionFactory类中的signaling_thread。

     类PhysicalSocketServer和类Win32SocketServer都继承自SocketServer类,在该类中定义了Wait和WakeUp方法,一个是用来等待消息的,另一个则是唤醒等待对消息进行响应。以Win32SocketServer为例,在函数Win32SocketServer::Wait中调用GetMessage等待消息,由于"GetMessage不接收属于其他线程或应用程序的消息。获取消息成功后,线程将从消息队列中删除该消息。函数会一直等待直到有消息到来才有返回值。",所以会一直等待直到有消息时该函数返回,外层函数Wait返回。主线程的PeerConnectionFactory::Initialize函数中会调用signaling_thread_->Send发送MSG_INIT_FACTORY消息,

libjingle中的talk_base::Thread类,运行peerconnection,会调用六次构造函数Thread::Thread,其中的两次为解析dns地址,一个是解析localhost,另一个是解析stun的域名。另外四个则是main函数中的继承自Thread类的talk_base::Win32Thread对象,以及获取ThreadManager的单例对象时调用其构造函数ThreadManager::ThreadManager,然后在该构造函数中创建了一个Thread对象。另外两个则是PeerConnectionFactory类中定义的两个变量:talk_base::Thread* signaling_thread_;和talk_base::Thread* worker_thread_;。

其中Win32Thread对象在main函数中通过调用ThreadManager::SetCurrentThread被设置成主线程。注意ThreadManager类中的TlsGetValue和TlsSetValue这一组函数用到了线程的局部存储。

事实上PeerConnectionFactory类中的两个线程变量会传递到底层的某些类中,如果这些类需要发送消息,直接调用Send或者Post即可。以PeerConnectionFactory::Initialize函数中的signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);为例子,当主线程执行了上面这个语句后,主线程会一直阻塞知道MSG_INIT_FACTORY消息的响应,即在signaling_thread_线程的消息循环中响应MSG_INIT_FACTORY。在这个消息的响应中,又会调用worker_thread_->Send,但是是通过talk_base::Thread::Invoke来调用的。

以Thread::Send和Win32SocketServer::WakeUp两个函数为入口点来理解talk_base::Thread的工作机制。先看Thread::Send函数,以signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);发送消息为例,通过IsCurrent来判断如果是当前这个线程则直接处理,比如在signaling_thread_线程的消息响应函数中又调用signaling_thread_->Send,则会直接就进行消息处理的。否则在发送MSG_INIT_FACTORY消息时,会调用Thread::Current()获取到主线程赋值给current_thread,可以把current_thread理解成父线程。然后将MSG_INIT_FACTORY消息放到signaling_thread_的成员变量std::list<_SendMessage> sendlist_中,由于在PeerConnectionFactory类的构造函数中在初始化其成员变量signaling_thread_和worker_thread_就马上调用其Start方法,这个方法最终调用Thread::ProcessMessages函数,在这个函数的while循环中进行线程的消息响应。

接下来以主线程的消息循环为例,由于ignaling_thread_和worker_thread_两个线程的父类MessageQueue的构造函数中给其变量SocketServer* ss_;赋值为PhysicalSocketServer类型,而主线程Win32Thread则在其构造函数中通过调用MessageQueue::set_socketserver函数将祖先类MessageQueue的变量SocketServer* ss_;赋值为Win32SocketServer类型。这样在talk_base::Thread的消息循环中,即Thread::ProcessMessages的while循环中,调用MessageQueue::Get来获取消息时,若获取到消息则退出,若获取不到消息则调用SocketServer::Wait函数等待,基于以上分析,所以主函数中就是执行Win32SocketServer::Wait函数。

以上关于调用Win32SocketServer::Wait的分析有点问题。事实上,由于Win32Thread继承自talk_base::Thread类,但是并没有像signaling_thread_和worker_thread_两个线程调用了Start方法,即并没有启动消息循环。Win32SocketServer::Wait被调用是在signaling_thread_->Send时,即在主线程调用这个函数发送消息时,由于主线程作为signaling_thread_的父线程,在Thread::Send函数中被调用,而且传递的Wait的第二个参数process_io为false,而signaling_thread_和worker_thread_两个线程的消息循环中,调用MessageQueue::Get时候第三个参数为默认参数,所以调用PhysicalSocketServer::Wait时,第二个参数process_io为true。

再回到Thread::Send函数中,还以signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);为例子,将MSG_INIT_FACTORY消息放到signaling_thread_的成员变量std::list<_SendMessage> sendlist_中后,由于前面分析,signaling_thread_可能因为在等待消息的到来而阻塞,所以需要调用ss_->WakeUp();在这里就是PhysicalSocketServer::WakeUp()函数。接下来就是用一个bool变量bool waited = false;来标记当前的消息是否已经被处理,如果没有处理完毕,则当前线程(即发送信息的线程的父线程)会一直阻塞在while循环,在signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);这个例子中,就是主线程通过调用Win32SocketServer::Wait来一直阻塞自己,直到消息响应函数执行完毕返回才跳出while循环,然后主线程才可以继续执行完该函数,并继续执行signaling_thread_->Send后面的语句,所以这个Send类似于windows函数SendMessage函数。都是等待当前消息响应完毕才返回。

在Thread::ReceiveSends函数中,通过遍历Thread类成员变量std::list<_SendMessage> sendlist_;,来对该线程的所发送的消息进行处理。而每次Send所发送的消息都是及时响应的,所以sendlist_中最多有一个代表已发送消息的元素。

libjingle的线程机制