首页 > 代码库 > Linux高性能服务器编程——多线程编程(下)

Linux高性能服务器编程——多线程编程(下)

多线程编程


条件变量


如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于线程之间同步共享数据的值。条件变量提供了一种线程间的通信机制:当某个共享数据达到某个值得时候,唤醒等待这个共享数据的线程。


条件本身是由互斥量保护的。线程在改变条件状态前必须首先锁住互斥量,其他现成在获得互斥量之前不会察觉到这种变化,因为必须锁住互斥量以后才能计算条件。


条件变量的相关函数主要有如下5个:


#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *restrict cond, phread_mutex_t *restrict mutex)


这些函数的第一个参数cond指向要操作的目标条件变量,条件变量的类型是pthread_cond_t结构体。


pthread_cond_init函数用于初始化条件变量。cond_attr参数指定条件变量的属性。如果将它设置为NULL,则表示使用默认属性。条件变量的属性不多,而且和互斥锁的属性类型相似,所以我们不在累赘。除了pthread_cond_init函数外,我们还可以使用如下方式初始化一个条件变量:


pthread_cond_t cond = PTHREAD_COND_INITIALIZER;


宏PTHREAD_COND_INITIALIZER实际上只是把条件变量的 各个字段都初始化为0。


pthread_cond_destroy函数用于销毁条件变量,以释放其占用的内核资源。销毁一个正在等待的条件变量将失败并返回EBUSY。


pthread_cond_broadcast函数以广播的方式唤醒所有等待目标条件变量的线程。pthread_cond_signal函数用于唤醒一个等待目标条件变量的线程。至于哪个线程将被唤醒,则取决于线程的优先级和调度策略。有时候我们可能想唤醒一个指定的线程,但pthread没有对该需求提供解决方法。不过我们可以间接地实现该需求:定义一个能够唯一表示目标现成的全局变量,在唤醒等待条件变量的线程前先设置该全局变量为目标线程,然后采用广播方式唤醒所有等待条件变量的线程,这些线程被唤醒后都检查该变量以判断被唤醒的是否是自己,如果是就开始执行后续代码,如果不是则返回继续等待。


pthread_cond_wait函数用于等待目标条件变量。mutex参数是用于保护条件变量的互斥锁,以确pthread_cond_wait操作的原子性。在调用pthread_cond_wait前,必须确保互斥锁mutex已经加锁,否则将导致不可预期的结果。

pthread_cond_wait函数执行时,首先把调用线程放入条件变量的等待队列中,然后将互斥锁mutex解锁。可见,从pthread_cond_wait开始执行到其调用线程被放入条件变量的等待队列之间的这段时间内,pthread_cond_signal和pthread_cond_broadcast等函数不会修改条件变量。换言之,pthread_cond_wait函数不会错过目标条件变量的任何变化。当pthread_cond_wait函数成功返回时,互斥锁mutex将再次被锁上。


上面这些函数成功时返回0,失败时返回错误码。


如下程序展示了如何使用条件变量:

实例代码

/******************************************************************************
 * 描述:
 *     应用Pthreads条件变量的实例代码,主线程创建三个线程,其中两个为“count”变量做
 * 加法运算,第三个线程监视“count”的值。当“count”达到一个限定值,等待线程准备接收来
 * 自于两个加法线程中一个的信号,等待 线程唤醒后更改“count”的值。程序继续运行直到加法
 * 线程达到TCOUNT的值。最后,主程序打印出count的值。
 ******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define NUM_THREADS  3
#define TCOUNT 5       //单线程轮询次数
#define COUNT_LIMIT 7  //发送信号的次数
int count = 0; //全局的累加量

pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;

void *inc_count(void *t) {
    int i;
    long my_id = (long) t;

    for (i = 0; i < TCOUNT; i++) {
        pthread_mutex_lock(&count_mutex);
        count++;
        /*
         * 检查count的值,如果条件满足就发信号给等待线程
         * 注意,此处是用信号量锁定的。
         * */
        if (count < COUNT_LIMIT) {
            printf("inc_count(): thread %ld, count = %d  Threshold reached. ",
                    my_id, count);
            pthread_cond_signal(&count_threshold_cv);
            printf("Just sent signal.\n");
        }
        printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id,
                count);
        pthread_mutex_unlock(&count_mutex);

        /*为线程轮询互斥锁增加延时*/
        sleep(1);
    }
    pthread_exit(NULL);
}

void *watch_count(void *t) {
    long my_id = (long) t;
    printf("Starting watch_count(): thread %ld\n", my_id);

    /*锁定互斥量并等待信号,注意,pthread_cond_wait函数在等待时将自动以自动原子方式
     * 解锁互斥量。还有,请注意,如果等待线程运行到等待函数之前已经满足COUNT_LIMIT的
     * 条件判断,轮询会忽略掉等待函数,
     * */
    while (count < COUNT_LIMIT) {
        pthread_mutex_lock(&count_mutex);
        printf("watch_count(): thread %ld going into wait...\n", my_id);
        pthread_cond_wait(&count_threshold_cv, &count_mutex);
        printf("watch_count(): thread %ld Condition signal received.\n", my_id);

        printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
        pthread_mutex_unlock(&count_mutex);
    }
    pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
    int i;
    long t1 = 1, t2 = 2, t3 = 3;
    pthread_t threads[3];
    pthread_attr_t attr;

    /*初始化互斥量和条件变量对象*/
    pthread_mutex_init(&count_mutex, NULL);
    pthread_cond_init(&count_threshold_cv, NULL);

    /*创建线程时设为可连接状态,便于移植*/
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_create(&threads[0], &attr, watch_count, (void *) t1);
	sleep(1);
    pthread_create(&threads[1], &attr, inc_count, (void *) t2);
    pthread_create(&threads[2], &attr, inc_count, (void *) t3);

    /* 等待所有线程完成*/
    for (i = 1; i < NUM_THREADS; i++) {
        pthread_join(threads[i], NULL);
    }
   /*发送信号给监听线程*/
    pthread_cond_signal(&count_threshold_cv); 
    pthread_join(threads[0],NULL);
    printf("Main(): Waited on %d threads. Final value of count = %d. Done.\n",
            NUM_THREADS, count);

    /*清除并退出 */
    pthread_attr_destroy(&attr);
    pthread_mutex_destroy(&count_mutex);
    pthread_cond_destroy(&count_threshold_cv);
    pthread_exit(NULL);
}



执行结果如下:


Starting watch_count(): thread 1
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count = 1 Threshold reached. Just sent signal.
inc_count(): thread 3, count = 1, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 1.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count = 2 Threshold reached. Just sent signal.
inc_count(): thread 2, count = 2, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 2.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count = 3 Threshold reached. Just sent signal.
inc_count(): thread 3, count = 3, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 3.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count = 4 Threshold reached. Just sent signal.
inc_count(): thread 2, count = 4, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 4.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count = 5 Threshold reached. Just sent signal.
inc_count(): thread 3, count = 5, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 5.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count = 6 Threshold reached. Just sent signal.
inc_count(): thread 2, count = 6, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 6.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count = 7, unlocking mutex
inc_count(): thread 2, count = 8, unlocking mutex
inc_count(): thread 3, count = 9, unlocking mutex
inc_count(): thread 2, count = 10, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now = 10.

Main(): Waited on 3 threads. Final value of count = 10. Done.


两个地方需要注意一下:


1.)pthread_cond_wait()有解锁和锁定互斥量的操作,它所进行的操作大体有三步:解锁—阻塞监听—锁定,所以在监听线程的循环体里面有两次“锁定-解锁”的操作;

2.)主函数main最后的pthread_cond_signal()这句必不可少,因为监听线程运转没有延时,在count的值达到COUNT_LIMIT-1时,已经处于waiting状态。


多线程环境


可重入函数


如果一个函数能被多个线程同时调用且不发生竞态条件,则我们称它是线程安全的,或者说它是可重入的。Linux库函数只有一小部分是不可重入的,这些函数之所以不可重入,主要因为内部使用了静态变量。不过Linux对很多不可重入的库函数提供了对应的可重入版本,这些可重入版本的函数名是在原函数名尾加上_r。在多线程程序中调用库函数,一定要使用其可重入版本,否则可能导致预想不不到的结果。


线程和进程


如果一个线程程序的某个线程调用了fork函数,那么新创建的子进程是否创建和父进程相同数量的线程呢?答案是否,子进程只拥有一个执行线程,它是调用fork的那个线程的完整复制。并且子进程将自动继承父进程中互斥锁(条件变量与之类似)的状态。也就是说,父进程中已经被加锁的互斥锁在子进程中也是被锁住的。这就引起了一个问题:子进程可能不清楚从父进程继承而来的互斥锁的具体状态。这个互斥锁可能被加锁了,但并不是由调用fork函数的那个线程锁住的,而是由其他现成锁住的。如果是这种情况,则子进程若再次对该互斥锁执行加锁操作就会导致死锁。如下代码清单展示在多线程中调用fork函数引起了死锁的例子。

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <wait.h>

pthread_mutex_t mutex;

void* another( void* arg )
{
    printf( "in child thread, lock the mutex\n" );
    pthread_mutex_lock( &mutex );
    sleep( 5 );
    pthread_mutex_unlock( &mutex );
}

void prepare()
{
    pthread_mutex_lock( &mutex );
}

void infork()
{
    pthread_mutex_unlock( &mutex );
}

int main()
{
    pthread_mutex_init( &mutex, NULL );
    pthread_t id;
    pthread_create( &id, NULL, another, NULL );
    //pthread_atfork( prepare, infork, infork );
    sleep( 1 );
    int pid = fork();
    if( pid < 0 )
    {
        pthread_join( id, NULL );
        pthread_mutex_destroy( &mutex );
        return 1;
    }
    else if( pid == 0 )
    {
        printf( "I anm in the child, want to get the lock\n" );
        pthread_mutex_lock( &mutex );
        printf( "I can not run to here, oop...\n" );
        pthread_mutex_unlock( &mutex );
        exit( 0 );
    }
    else
    {
        pthread_mutex_unlock( &mutex );
        wait( NULL );
    }
    pthread_join( id, NULL );
    pthread_mutex_destroy( &mutex );
    return 0;
}



不过,pthread提供了一个专门的函数pthread_atfork,以确保fork调用父进程和子进程都拥有一个清楚的锁状态。该函数的定义如下:


#include <pthread.h>

int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));


该函数将建立3个fork句柄来帮组我们清理互斥锁的状态。prepare句柄将在fork调用创建子进程之前被执行。它可以用来锁住所有父进程中的互斥锁。parent句柄则是fork调用创建出子进程之后,而fork返回之前,在父进程中被执行。它的作用是释放所有在prepare句柄中被锁住的互斥锁。child句柄是fork返回之前,在子进程之中被执行。和parent句柄一样,child句柄也是用于释放所有在prepare句柄中被锁住的互斥锁。该函数成功时返回0,失败时返回错误码。

因此,如果要让上面程序清单正常工作,就应该在其中的fork调用前加入如下代码:

void nprepare()
{
	pthread_mutex_lock(&mutex);
}
void infork()
{
	pthread_mutex_unlock(&mutex);
}
pthread_atfork(prepare, infork, infork);


线程与信号


每个线程都可以独立地设置信号掩码,在多线程环境下使用如下函数设置线程信号掩码:


#include <signal.h>

int pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset);


该函数与sigprocmask参数完全相同,不在累赘。pthread_sigmask成功时返回0,失败时返回错误码。


由于进程中的所有线程共享该进程的信号,所以线程库将根据线程掩码决定把信号发送给哪个具体的线程。因此,如果我们在每个子线程中都单独设置信号掩码,就很容易导致逻辑错误。此外,所有线程共享信号处理函数,也就是说当我们在一个线程中设置了某个信号的信号处理函数后,它将覆盖其他线程为同一个信号设置的信号处理函数。这两点都说明,我们应该定义一个专门的线程来处理所有的信号。这可以通过以下两个步骤实现:


1) 在主线程创建出其他子线程之前就调用pthread_sigmask来设置好信号掩码,所有新创建的子线程都将自动继承这个信号掩码。这样做之后,实际上所有线程都不会响应被屏蔽的信号了。

2) 在某个线程中调用如下函数来等待信号并处理之


#include <signal.h>

int sigwait(const sigset_t *set, int *sig);


set参数指定需要等待的信号的集合。我们可以简单地将其指定为第1步中创建的信号掩码,表示在该线程中等待所有被屏蔽的信号。参数sig指向的整数用于存储该函数返回的信号值。sigwait函数会自动取消信号集的阻塞状态,直到有新的信号被递送。在返回前,sigwwait将恢复线程信号的屏蔽字。sigwait成功时返回0,失败则返回错误码。一旦sigwait正确返回,我们就可以对接收到的信号做处理了。很显然,如果我们使用了sigwait,就不应该再为信号设置信号处理函数了。这是因为当程序员接收到信号时,二者中只能有一个起作用。如下代码展示了如何通过上述两个步骤实现在一个线程中同意处理所有信号:

       #include <pthread.h>
       #include <stdio.h>
       #include <stdlib.h>
       #include <unistd.h>
       #include <signal.h>
       #include <errno.h>

       #define handle_error_en(en, msg)                do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

       static void * sig_thread(void *arg)
       {
           int s, sig;
           sigset_t *set = (sigset_t *) arg;

           for (;;) {
//第二个步骤,调用sigwait等待信号
               s = sigwait(set, &sig);
               if (s != 0)
                   handle_error_en(s, "sigwait");
               printf("Signal handling thread got signal %d\n", sig);
           }
       }


       int main(int argc, char *argv[])
       {
           pthread_t thread;
           sigset_t set;
           int s;

           //第一个步骤,在主线程中设置信号掩码
           sigemptyset(&set);
           sigaddset(&set, SIGQUIT);
           sigaddset(&set, SIGUSR1);
	       s = pthread_sigmask(SIG_BLOCK, &set, NULL);
	       if (s != 0)
               	handle_error_en(s, "pthread_sigmask");

            s = pthread_create(&thread, NULL, &sig_thread, (void *) &set);
           if (s != 0)
               handle_error_en(s, "pthread_create");
           pause(); 
       }





最后,pthread还提供了下面的方法,使得我们可以明确地将一个信号发送给指定的线程:


#include <signal.h>

int pthread_kill(pthread_t thread, int sig);


其中,pthread参数指定目标线程,sig参数指定待发送的信号。如果sig为0,则pthread_kill不发送信号,但它仍然会执行错误检查。我们可以利用这种方式检测目标线程是否存在。pthread_kill成功时返回0,失败时返回错误代码。