首页 > 代码库 > 使用线程间通信之条件变量

使用线程间通信之条件变量

近期用C++写安卓下的一个通讯程序。作为jni库给java调用,採用多线程轮询遇到一个问题描写叙述例如以下:
A线程收到数据,放入队列,是生产者。
B、C、D若干个线轮询训消息队列,假设队列有数据就取出进行处理,没数据就Sleep(T)歇息,问题是这个T值取多大合适?取大了消息处理不及时。取小了手机cpu上升电池非常快耗光。



这个问题最佳解法是採用条件变量,能够比較完美解决这个问题。下面代码使用C++封装,用win32 SDK的条件变量举例,Linux下有全然等价的概念:

// 线程消息通知
class ThreadMsgNotify
{
   // 条件变量和临界变量
   CONDITION_VARIABLE cv_;
   CRITICAL_SECTION cs_;
public:
   ThreadMsgNotify();
   ~ThreadMsgNotify();

   int Wait(DWORD ms);  // 消费者调用此函数,阻塞等待毫秒数
   void Notify();  //  生产者调用此函数: 发出通知
};

各成员方法实现例如以下:

// ---------------------------------------
ThreadMsgNotify::ThreadMsgNotify()
{
   InitializeConditionVariable(&cv_);
   InitializeCriticalSection(&cs_);
}

ThreadMsgNotify::~ThreadMsgNotify()
{
   WakeAllConditionVariable(&cv_);  // 唤醒所有线程
   Sleep(50);

   DeleteCriticalSection(&cs_);
   DeleteConditionVariable(&cv_);
}

int ThreadMsgNotify::Wait(DWORD ms)  // 消费者,阻塞等待毫秒数
{
   EnterCriticalSection(&cs_);
   int ret = SleepConditionVariableCS(&cv_, &cs_, ms);  // 等待
   LeaveCriticalSection(&cs_);
   return(ret);
}

void ThreadMsgNotify::Notify()  // 生产者: 发出通知
{
   EnterCriticalSection(&cs_);
   WakeConditionVariable(&cv_);  // 唤醒一个等待线程(假设有的话)
   LeaveCriticalSection(&cs_);
}
// --------------

上面的代码非常easy。差点儿不用解释。

以下再给出測试代码:

class TagThreadNotifyTest
{
  private:
     list<string> msgList;
     bool isEnd = false;
     CRITICAL_SECTION cs_;
  public:
     int no;
     ThreadMsgNotify threadNotify;  // 线程通知,接收队列收到消息时通过该对象唤醒处理线程

     TagThreadNotifyTest();
     ~TagThreadNotifyTest();

     void New(const char* info);  // 生产者添加一条消息
     int Recv(string& msg);  // 消费者读取一条消息,返回读取前队列中的消息数

     // 消费者线程函数
     static LRESULT WINAPI ProcThread(void* lParam);
};

// 辅助函数。获取当前时间戳
void CurTime(char* timeStr)
{
   SYSTEMTIME st;
   GetLocalTime(&st);
   sprintf(timeStr, "%02d:%02d:%02d.%03d", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
}

TagThreadNotifyTest::TagThreadNotifyTest()
{
  InitializeCriticalSection(&cs_);

  // 创建3个消费者线程
  for(int i=0; i<3; i++){
     no = i;
     DWORD dwThreadid;
     HANDLE thd = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcThread,
                (void*)this, 0, &dwThreadid);
     CloseHandle(thd);
     Sleep(30);
  }
}

TagThreadNotifyTest::~TagThreadNotifyTest()
{
  isEnd = true;
  Sleep(100);
  DeleteCriticalSection(&cs_);
}

// 生产者将新消息放入队列
void TagThreadNotifyTest::New(const char* info)
{
   // 加锁后插入队列
   EnterCriticalSection(&cs_);
      msgList.push_back(info);
   LeaveCriticalSection(&cs_);

   threadNotify.Notify();  // 通知其它线程,去处理数据

   printf("notify...\n");
}

// 消费者读取消息。假设没有将返回0
int TagThreadNotifyTest::Recv(string& msg)
{
   EnterCriticalSection(&cs_);
      int n = msgList.size();
      if( n>0 ){
          msg = msgList.front();
          msgList.pop_front();
      }
   LeaveCriticalSection(&cs_);
   return(n);
}

// 消费者线程
LRESULT WINAPI TagThreadNotifyTest::ProcThread(void* lParam)
{
  TagThreadNotifyTest * test = (TagThreadNotifyTest *)lParam;
  int no = test->no;
  printf("Thread start,  no=%d...\n", no);
  char timeStr[80];
  while( !test->isEnd ){
     string msg;
     int ret = test->Recv(msg);  // 读取一条消息
     CurTime(timeStr);
     if( ret ){  // 假设有就打印出来
        printf(" [%d %s]Recv: %s\n", no, timeStr, msg.c_str());
        Sleep(1000);  // 延时1秒模拟处理较慢的情况
        continue;
     }
     else{  // 没有收到
        printf(" [%d %s]...\n", no, timeStr);
     }
 
     // 歇息15秒,假设有通知则会随时结束歇息
     test->threadNotify.Wait(15000);
  }

  printf("Thread End : no=%d.\n", no);
  return(1);
}

int main()
{
   // 控制台測试程序

   // new一个測试对象,此对象会创建3个消费者线程
   TagThreadNotifyTest* test = new TagThreadNotifyTest();

   // 作为生产者线程。就是接收你的按键,回车后产生一条消息
   while(true){
       char s[500];
       memset(s, 0, 500);
       gets(s);
       if( strcmp(s, "exit")==0 ){
           break;
       }
       if( s[0]=='\0' )
           continue;

       // 提交消息
       test->New(s);
   }

   delete test;
   return(0);
}

在windows和Linux下(替换成相应函数)均測试通过。

输入一个回车后,消费者线程将马上取到消息并打印出来。假设没有消息。则消费者线程等待15秒。CPU非常轻松。


使用线程间通信之条件变量