首页 > 代码库 > 使用线程间通信之条件变量

使用线程间通信之条件变量

最近用C++写安卓下的一个通讯程序,作为jni库给java调用,采用多线程轮询遇到一个问题描述如下:
A线程收到数据,放入队列,是生产者。
B、C、D若干个线轮询训消息队列,如果队列有数据就取出进行处理,没数据就Sleep(T)休息,问题是这个T值取多大合适?取大了消息处理不及时,取小了手机cpu上升电池很快耗光。

这个问题最佳解法是采用条件变量,可以比较完美解决问题,以下代码使用C++封装,用win32 SDK的条件变量举例,Linux下有完全等价的概念:

// 线程消息通知
class ThreadMsgNotify
{
   // 条件变量和临界变量
   CONDITION_VARIABLE cv_;
   CRITICAL_SECTION cs_;
public:
   ThreadMsgNotify();
   ~ThreadMsgNotify();

   int Wait(DWORD ms);  // 消费者调用此函数,堵塞等待毫秒数
   void Notify();  //  生产者调用此函数: 发出通知
};

各成员方法实现如下:

// ---------------------------------------
ThreadMsgNotify::ThreadMsgNotify()
{
   InitializeConditionVariable(&cv_);
   InitializeCriticalSection(&cs_);
}

ThreadMsgNotify::~ThreadMsgNotify()
{
   WakeAllConditionVariable(&cv_);  // 唤醒全部线程
   Sleep(50);

   DeleteCriticalSection(&cs_);
   DeleteConditionVariable(&cv_);
}

int ThreadMsgNotify::Wait(DWORD ms)  // 消费者,堵塞等待毫秒数
{
   EnterCriticalSection(&cs_);
   int ret = SleepConditionVariableCS(&cv_, &cs_, ms);  // 等待
   LeaveCriticalSection(&cs_);
   return(ret);
}

void ThreadMsgNotify::Notify()  // 生产者: 发出通知
{
   EnterCriticalSection(&cs_);
   WakeConditionVariable(&cv_);  // 唤醒一个等待线程(如果有的话)
   LeaveCriticalSection(&cs_);
}
// --------------

上面的代码很简单,几乎不用解释。

下面再给出测试代码:

class TagThreadNotifyTest
{
  private:
     list<string> msgList;
     bool isEnd = false;
     CRITICAL_SECTION cs_;
  public:
     int no;
     ThreadMsgNotify threadNotify;  // 线程通知,接收队列收到消息时通过该对象唤醒处理线程

     TagThreadNotifyTest();
     ~TagThreadNotifyTest();

     void New(const char* info);  // 生产者增加一条消息
     int Recv(string& msg);  // 消费者读取一条消息,返回读取前队列中的消息数

     // 消费者线程函数
     static LRESULT WINAPI ProcThread(void* lParam);
};

// 辅助函数,获取当前时间戳
void CurTime(char* timeStr)
{
   SYSTEMTIME st;
   GetLocalTime(&st);
   sprintf(timeStr, "%02d:%02d:%02d.%03d", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
}

TagThreadNotifyTest::TagThreadNotifyTest()
{
  InitializeCriticalSection(&cs_);

  // 创建3个消费者线程
  for(int i=0; i<3; i++){
     no = i;
     DWORD dwThreadid;
     HANDLE thd = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcThread,
                (void*)this, 0, &dwThreadid);
     CloseHandle(thd);
     Sleep(30);
  }
}

TagThreadNotifyTest::~TagThreadNotifyTest()
{
  isEnd = true;
  Sleep(100);
  DeleteCriticalSection(&cs_);
}

// 生产者将新消息放入队列
void TagThreadNotifyTest::New(const char* info)
{
   // 加锁后插入队列
   EnterCriticalSection(&cs_);
      msgList.push_back(info);
   LeaveCriticalSection(&cs_);

   threadNotify.Notify();  // 通知其他线程,去处理数据

   printf("notify...\n");
}

// 消费者读取消息,如果没有将返回0
int TagThreadNotifyTest::Recv(string& msg)
{
   EnterCriticalSection(&cs_);
      int n = msgList.size();
      if( n>0 ){
          msg = msgList.front();
          msgList.pop_front();
      }
   LeaveCriticalSection(&cs_);
   return(n);
}

// 消费者线程
LRESULT WINAPI TagThreadNotifyTest::ProcThread(void* lParam)
{
  TagThreadNotifyTest * test = (TagThreadNotifyTest *)lParam;
  int no = test->no;
  printf("Thread start,  no=%d...\n", no);
  char timeStr[80];
  while( !test->isEnd ){
     string msg;
     int ret = test->Recv(msg);  // 读取一条消息
     CurTime(timeStr);
     if( ret ){  // 如果有就打印出来
        printf(" [%d %s]Recv: %s\n", no, timeStr, msg.c_str());
        Sleep(1000);  // 延时1秒模拟处理较慢的情况
        continue;
     }
     else{  // 没有收到
        printf(" [%d %s]...\n", no, timeStr);
     }
 
     // 休息15秒,如果有通知则会随时结束休息
     test->threadNotify.Wait(15000);
  }

  printf("Thread End : no=%d.\n", no);
  return(1);
}

int main()
{
   // 控制台测试程序

   // new一个测试对象,此对象会创建3个消费者线程
   TagThreadNotifyTest* test = new TagThreadNotifyTest();

   // 作为生产者线程,就是接收你的按键,回车后产生一条消息
   while(true){
       char s[500];
       memset(s, 0, 500);
       gets(s);
       if( strcmp(s, "exit")==0 ){
           break;
       }
       if( s[0]=='\0' )
           continue;

       // 提交消息
       test->New(s);
   }

   delete test;
   return(0);
}

在windows和Linux下(替换成对应函数)均测试通过。

输入一个回车后,消费者线程将立即取到消息并打印出来。如果没有消息,则消费者线程等待15秒,CPU很轻松。


使用线程间通信之条件变量