首页 > 代码库 > 4.锁--并行编程之条件变量(posix condition variables)

4.锁--并行编程之条件变量(posix condition variables)

在整理Java LockSupport.park()的东东。看到了个"Spurious wakeup"。又一次梳理下。

首先来个《UNIX环境高级编程》里的样例:

[cpp] view plaincopy技术分享技术分享
  1. #include <pthread.h>  
  2. struct msg {  
  3.     struct msg *m_next;  
  4.     /* ... more stuff here ... */  
  5. };  
  6. struct msg *workq;  
  7. pthread_cond_t qready = PTHREAD_COND_INITIALIZER;  
  8. pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;  
  9. void process_msg(void) {  
  10.     struct msg *mp;  
  11.     for (;;) {  
  12.         pthread_mutex_lock(&qlock);  
  13.         while (workq == NULL)  
  14.             pthread_cond_wait(&qready, &qlock);  
  15.         mp = workq;  
  16.         workq = mp->m_next;  
  17.         pthread_mutex_unlock(&qlock);  
  18.         /* now process the message mp */  
  19.     }  
  20. }  
  21. void enqueue_msg(struct msg *mp) {  
  22.     pthread_mutex_lock(&qlock);  
  23.     mp->m_next = workq;  
  24.     workq = mp;  
  25.     pthread_mutex_unlock(&qlock);  
  26.     pthread_cond_signal(&qready);  
  27. }  

一个简单的消息生产者和消费者的代码。它们之间用condition同步。

这个代码最easy让人搞混的是process_msg函数里的pthread_mutex_lock 和 pthread_mutex_unlock 是一对函数调用。前面加锁。后面解锁。的确,是加锁解锁。可是它们两不是一对的。它们的还有一半在pthread_cond_wait函数里。

pthread_cond_wait函数能够觉得它做了三件事:

  • 把自身线程放到condition的等待队列里,把mutex解锁;
  • 等待被唤醒(当其他线程调用pthread_cond_signal或者pthread_cond_broadcast时)。
  • 被唤醒之后。对metex加锁。再返回。

mutex和condition实际上是绑定在一起的,一个condition仅仅能相应一个mutex。在Java的代码里,Condition对象仅仅能通过lock.newCondition()的函数来获取。

Spurious wakeup

所谓的spurious wakeup。指的是一个线程调用pthread_cond_signal()。却有可能不止一个线程被唤醒。

为什么会出现这样的情况?wiki和其他的一些文档都仅仅是说在多核的情况下。简化实现同意出现这样的spurious wakeup。

在man文档里给出了一个可能的实现。然后解析为什么会出现。

假定有三个线程,线程A正在运行pthread_cond_wait,线程B正在运行pthread_cond_signal,线程C正准备运行pthread_cond_wait函数。

[cpp] view plaincopy技术分享技术分享
  1. pthread_cond_wait(mutex, cond):  
  2.     value = cond->value; /* 1 */  
  3.     pthread_mutex_unlock(mutex); /* 2 */  
  4.     pthread_mutex_lock(cond->mutex); /* 10 */  
  5.     if (value == cond->value) { /* 11 */  
  6.         me->next_cond = cond->waiter;  
  7.         cond->waiter = me;  
  8.         pthread_mutex_unlock(cond->mutex);  
  9.         unable_to_run(me);  
  10.     } else  
  11.         pthread_mutex_unlock(cond->mutex); /* 12 */  
  12.     pthread_mutex_lock(mutex); /* 13 */  
  13.   
  14.   
  15. pthread_cond_signal(cond):  
  16.     pthread_mutex_lock(cond->mutex); /* 3 */  
  17.     cond->value++; /* 4 */  
  18.     if (cond->waiter) { /* 5 */  
  19.         sleeper = cond->waiter; /* 6 */  
  20.         cond->waiter = sleeper->next_cond; /* 7 */  
  21.         able_to_run(sleeper); /* 8 */  
  22.     }  
  23.     pthread_mutex_unlock(cond->mutex); /* 9 */  

线程A运行了第1,2步,这时它释放了mutex。然后线程B拿到了这个mutext,而且pthread_cond_signal函数时运行并返回了。于是线程B就是一个所谓的“spurious wakeup”。

为什么pthread_cond_wait函数里一进入。就释放了mutex?没有找到什么解析。。

查看了glibc的源码,大概能够看出上面的一些影子,可是太复杂了,也没有搞明确为什么。。

/build/buildd/eglibc-2.19/nptl/pthread_cond_wait.c

/build/buildd/eglibc-2.19/nptl/pthread_cond_signal.c

只是从上面的解析,能够发现《UNIX高级编程》里的说明是错误的(可能是由于太久了)。


    The  caller passes it locked to the function, which then atomically places the calling thread on the list of threads waiting for the condition and unlocks the mutex. 


上面的伪代码,一进入pthread_cond_wait函数就释放了mutex,明显和书里的不一样。

wait morphing优化

在《UNIX环境高级编程》的演示样例代码里,是先调用pthread_mutex_unlock,再调用pthread_cond_signal。

[cpp] view plaincopy技术分享技术分享
  1. void enqueue_msg(struct msg *mp) {  
  2.     pthread_mutex_lock(&qlock);  
  3.     mp->m_next = workq;  
  4.     workq = mp;  
  5. <strong>  pthread_mutex_unlock(&qlock);  
  6.     pthread_cond_signal(&qready);</strong>  
  7. }  
有的地方给出的是先调用pthread_cond_signal。再调用pthread_mutex_unlock:

[cpp] view plaincopy技术分享技术分享
  1. void enqueue_msg(struct msg *mp) {  
  2.     pthread_mutex_lock(&qlock);  
  3.     mp->m_next = workq;  
  4.     workq = mp;  
  5.     pthread_cond_signal(&qready);  
  6.     pthread_mutex_unlock(&qlock);  
  7. }  
先unlock再signal,这有个优点。就是调用enqueue_msg的线程能够再次參与mutex的竞争中,这样意味着能够连续放入多个消息,这个可能会提高效率。

类似Java里ReentrantLock的非公平模式。

网上有些文章说。先singal再unlock,有可能会出现一种情况是被singal唤醒的线程会由于不能立即拿到mutex(还没被释放)。从而会再次休眠,这样影响了效率。从而会有一个叫“wait morphing”优化,就是假设线程被唤醒可是不能获取到mutex,则线程被转移(morphing)到mutex的等待队列里。

可是我查看了下glibc的源码,貌似没有发现有这样的“wait morphing”优化。

man文档里提到:

       The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that  threads  calling  pthread_cond_wait()  or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

可见在调用singal之前。能够不持有mutex。除非是“predictable scheduling”,可预測的调度行为。这样的可能是实时系统才有这样的严格的要求。

为什么要用while循环来推断条件是否成立?

[cpp] view plaincopy技术分享技术分享
  1. while (workq == NULL)  
  2.     pthread_cond_wait(&qready, &qlock);  

而不用if来推断?

[cpp] view plaincopy技术分享技术分享
  1. if (workq == NULL)  
  2.     pthread_cond_wait(&qready, &qlock);  

一个原因是spurious wakeup,但即使没有spurious wakeup。也是要用While来推断的。

比方线程A,线程B在pthread_cond_wait函数中等待,然后线程C把消息放到队列里,再调用pthread_cond_broadcast。然后线程A先获取到mutex。处理完消息完后,这时workq就变成NULL了。这时线程B才获取到mutex,那么这时实际上是没有资源供线程B使用的。所以从pthread_cond_wait函数返回之后,还是要推断条件是否成功。假设成立,再进行处理。

pthread_cond_signal和pthread_cond_broadcast

在这篇文章里,http://www.cppblog.com/Solstice/archive/2013/09/09/203094.html

给出的演示样例代码7里,觉得调用pthread_cond_broadcast来唤醒全部的线程是比較好的写法。可是我觉得pthread_cond_signal和pthread_cond_broadcast是两个不同东东,不能简单合并在同一个函数调用。

仅仅唤醒一个效率和唤醒全部等待线程的效率显然不能等同。典型的condition是用CLH或者MCS来实现的,要通知全部的线程,则要历遍链表,显然效率减少。另外。C++11里的condition_variable也提供了notify_one函数。

http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one

mutex,condition是不是公平(fair)的?

这个在參考文档里没有说明。在网上找了些资料,也没有什么明白的答案。

我写了个代码測试。发现mutex是公平的。condition的測试结果也是差点儿相同。

[cpp] view plaincopy技术分享技术分享
  1. #include <stdio.h>  
  2. #include <stdlib.h>  
  3. #include <pthread.h>  
  4. #include <unistd.h>  
  5. pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;  
  6.   
  7. volatile int mutexCount = 0;  
  8. void mutexFairTest(){  
  9.     int localCount = 0;  
  10.     while(1){  
  11.         pthread_mutex_lock(&lock);  
  12.         __sync_fetch_and_add(&mutexCount, 1);  
  13.         localCount += 1;  
  14.         if(mutexCount > 100000000){  
  15.             break;  
  16.         }  
  17.         pthread_mutex_unlock(&lock);  
  18.     }  
  19.     pthread_mutex_unlock(&lock);  
  20.     printf("localCount:%d\n", localCount);  
  21. }  
  22.   
  23. int main() {  
  24.     pthread_mutex_lock(&lock);  
  25.     pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);  
  26.     pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);  
  27.     pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);  
  28.     pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);  
  29.     pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);  
  30.     pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);  
  31.     pthread_mutex_unlock(&lock);  
  32.   
  33.     sleep(100);  
  34. }  
输出结果是:

[plain] view plaincopy技术分享技术分享
  1. localCount:16930422  
  2. localCount:16525616  
  3. localCount:16850294  
  4. localCount:16129844  
  5. localCount:17329693  
  6. localCount:16234137  
还特意在一个单CPU的虚拟机上測试了下。输出的结果差点儿相同。操作系统是ububtu14.04。

參考:

http://en.wikipedia.org/wiki/Spurious_wakeup

http://siwind.iteye.com/blog/1469216

http://www.cppblog.com/Solstice/archive/2013/09/09/203094.html

http://www.cs.cmu.edu/afs/cs/academic/class/15492-f07/www/pthreads.html

4.锁--并行编程之条件变量(posix condition variables)