首页 > 代码库 > linux 线程的同步 二 (互斥锁和条件变量)

linux 线程的同步 二 (互斥锁和条件变量)

  为了允许在线程或进程之间共享数据,同步时必须的,互斥锁和条件变量是同步的基本组成部分。

1、互斥锁

  互斥锁是用来保护临界区资源,实际上保护的是临界区中被操纵的数据,互斥锁通常用于保护由多个线程或多进程分享的共享数据。一般是一些可供线程间使用的全局变量,来达到线程同步的目的,即保证任何时刻只有一个线程或进程在执行其中的代码。一般加锁的轮廓如下:

pthread_mutex_lock()临界区pthread_mutex_unlock()

互斥锁API

pthread_mutex_lock(pthread_mutex_t *mutex);

 用此函数加锁时,如果mutex已经被锁住,当前尝试加锁的线程就会阻塞,直到互斥锁被其他线程释放。当此函数返回时,说明互斥锁已经被当前线程成功加锁.

pthread_mutex_trylock(pthread_mutex_t *mutex);

 用此函数加锁时,如果mutex已经卑琐主,当前尝试加锁的线程不会阻塞,而是立即返回,返回的错误码为EBUSY,而不是阻塞等待。

pthread_mutex_unlock(pthread_mutex_t *mutex);

注意使用锁之前要记得初始化。互斥锁的初始化有两种初始化方式:

1.对于静态分配的互斥锁一半用宏赋值的方式初始化

eg: static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

2.对于动态分配的互斥锁(如调用malloc)或分配在共享内存中,则必须调用pthread_mutex_init(pthread_mutex *mutex, pthread_mutexattr_t *mutexattr)函数来进行初始化。

例子1:写个程序实现生产者—消费者问题,先只考虑多个生产者线程之间的同步,直到所有的生产者线程都完成工作以后,才启动消费者线程。程序如下:

1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <pthread.h> 5 #include <errno.h> 6  7 #define     MAXNITEMS        1000000 8 #define     MAXNTHREADS     100 9 10 int nitems;11 12 struct13 {14     pthread_mutex_t     mutex;15     int                 buff[MAXNITEMS];16     int                 nput;17     int                 nval;18 } shared = {19     PTHREAD_MUTEX_INITIALIZER20 };21 22 void *produce(void*);23 void *consume(void*);24 25 int main(int argc,char *argv[])26 {27     int     i,nthreads,count[MAXNTHREADS];28     pthread_t tid_produce[MAXNTHREADS],tid_consume;29     if(argc != 3)30     {31         printf("usage: producongs2 <#itmes> <#threads>.\n");32         exit(0);33     }34     nitems = atoi(argv[1]);35     nthreads = atoi(argv[2]);36     pthread_setconcurrency(nthreads);  //设置线程并发级别37     for(i=0;i<nthreads;++i)38     {39         count[i] = 0;40         pthread_create(&tid_produce[i],NULL,produce,&count[i]);41     }42     for(i=0;i<nthreads;i++)43     {44         pthread_join(tid_produce[i],NULL); //等待线程退出45         printf("count[%d] = %d\n",i,count[i]);46     }47     pthread_create(&tid_consume,NULL,consume,NULL);48     pthread_join(tid_consume,NULL);  //等待线程退出49     exit(0);50 }51 52 void *produce(void *arg)53 {54     for(; ;)55     {56         pthread_mutex_lock(&shared.mutex); //加锁57         if(shared.nput >= nitems)58         {59             pthread_mutex_unlock(&shared.mutex); //释放锁60             return ;61         }62         shared.buff[shared.nput] = shared.nval;63         shared.nput++;64         shared.nval++;65         pthread_mutex_unlock(&shared.mutex); //加锁66         *((int*) arg) += 1;67     }68 }69 void *consume(void *arg)70 {71     int     i;72     for(i=0;i<nitems;i++)73     {74         if(shared.buff[i] != i)75             printf("buff[%d] = %d\n",i,shared.buff[i]);76     }77     return;78 }

程序执行结果如下:

技术分享

例子2:改进例子1,所有生产者线程启动后立即启动消费者线程,这样生产者线程产生数据的同时,消费者线程就能出来它,此时必须同步生产者和消费者,程序如下:

 1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <pthread.h> 5 #include <errno.h> 6  7 #define     MAXNITEMS        1000000 8 #define     MAXNTHREADS     100 9 10 int nitems;11 12 struct13 {14     pthread_mutex_t     mutex;15     int                 buff[MAXNITEMS];16     int                 nput;17     int                 nval;18 } shared = {19     PTHREAD_MUTEX_INITIALIZER20 };21 22 void *produce(void*);23 void *consume(void*);24 void consume_wait(int);25 int main(int argc,char *argv[])26 {27     int     i,nthreads,count[MAXNTHREADS];28     pthread_t tid_produce[MAXNTHREADS],tid_consume;29     if(argc != 3)30     {31         printf("usage: producongs2 <#itmes> <#threads>.\n");32         exit(0);33     }34     nitems = atoi(argv[1]);35     nthreads = atoi(argv[2]);36     pthread_setconcurrency(nthreads+1);37     //创建生产者线程38     for(i=0;i<nthreads;++i)39     {40         count[i] = 0;41         pthread_create(&tid_produce[i],NULL,produce,&count[i]);42     }43     //创建消费者线程44     pthread_create(&tid_consume,NULL,consume,NULL);45     for(i=0;i<nthreads;i++)46     {47         pthread_join(tid_produce[i],NULL);48         printf("count[%d] = %d\n",i,count[i]);49     }50     //等待消费者线程退出51     pthread_join(tid_consume,NULL);52     exit(0);53 }54 55 void *produce(void *arg)56 {57     for(; ;)58     {59         pthread_mutex_lock(&shared.mutex);60         if(shared.nput >= nitems)61         {62             pthread_mutex_unlock(&shared.mutex);63             return ;64         }65         shared.buff[shared.nput] = shared.nval;66         shared.nput++;67         shared.nval++;68         pthread_mutex_unlock(&shared.mutex);69         *((int*) arg) += 1;70     }71 }72 void *consume(void *arg)73 {74     int     i;75     for(i=0;i<nitems;i++)76     {77         consume_wait(i);78         if(shared.buff[i] != i)79             printf("buff[%d] = %d\n",i,shared.buff[i]);80     }81     return;82 }83 void consume_wait(int i)84 {85     for(; ;)  //进行轮询,判断i是否已经由生产者生产86     {87         pthread_mutex_lock(&shared.mutex);88         if(i<shared.nput)   //i已经生产89         {90             pthread_mutex_unlock(&shared.mutex);91             return; 92         }93         pthread_mutex_unlock(&shared.mutex);94     }95 }

存在的问题:当消费者获取的条目尚没有准备好时,消费者线程一次次的循环去判断,每次给互斥锁解锁又上锁,这种轮询的办法浪费CPU时间。

2、条件变量

  互斥锁用于上锁,条件变量用于等待,条件变量的使用是与互斥锁共通使用的。

2.1等待与信号发送

  条件变量类型是pthread_cond_t,调用函数如下:

pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *pmutex);

pthread_cond_signal(pthread_cond_t *pcond);

每个条件变量总是有一个互斥锁与之关联。现在采用条件变量实现生产者与消费者问题,程序如下:

 1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <pthread.h> 5 #include <errno.h> 6  7 #define     MAXNITEMS        1000000 8 #define     MAXNTHREADS     100 9 10 int nitems;11 12 struct13 {14     pthread_mutex_t     mutex;15     int                 buff[MAXNITEMS];16     int                 nput;17     int                 nval;18 } shared = {19     PTHREAD_MUTEX_INITIALIZER20 };21 //条件变量22 struct {23     pthread_mutex_t mutex;  24     pthread_cond_t  cond;25     int nready;26 }nready = {27   PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER28 };29 30 void *produce(void*);31 void *consume(void*);32 33 int main(int argc,char *argv[])34 {35     int     i,nthreads,count[MAXNTHREADS];36     pthread_t tid_produce[MAXNTHREADS],tid_consume;37     if(argc != 3)38     {39         printf("usage: producongs2 <#itmes> <#threads>.\n");40         exit(0);41     }42     nitems = atoi(argv[1]);43     nthreads = atoi(argv[2]);44     pthread_setconcurrency(nthreads+1);45     for(i=0;i<nthreads;++i)46     {47         count[i] = 0;48         pthread_create(&tid_produce[i],NULL,produce,&count[i]);49     }50     pthread_create(&tid_consume,NULL,consume,NULL);51     for(i=0;i<nthreads;i++)52     {53         pthread_join(tid_produce[i],NULL);54         printf("count[%d] = %d\n",i,count[i]);55     }56     pthread_join(tid_consume,NULL);57     exit(0);58 }59 60 void *produce(void *arg)61 {62     printf("producer begins work\n");63     for(; ;)64     {65         pthread_mutex_lock(&shared.mutex);66         if(shared.nput >= nitems)67         {68             pthread_mutex_unlock(&shared.mutex);69             return ;70         }71         shared.buff[shared.nput] = shared.nval;72         shared.nput++;73         shared.nval++;74         pthread_mutex_unlock(&shared.mutex);75         pthread_mutex_lock(&nready.mutex);76         if(nready.nready == 0)77             pthread_cond_signal(&nready.cond); //通知消费者78         nready.nready++;79         pthread_mutex_unlock(&nready.mutex);80         *((int*) arg) += 1;81     }82 }83 void *consume(void *arg)84 {85     int     i;86     printf("consuemer begins work.\n");87     for(i=0;i<nitems;i++)88     {89         pthread_mutex_lock(&nready.mutex);90         while(nready.nready == 0)91             pthread_cond_wait(&nready.cond,&nready.mutex); //等待生产者92         nready.nready--;93         pthread_mutex_unlock(&nready.mutex);94         if(shared.buff[i] != i)95             printf("buff[%d] = %d\n",i,shared.buff[i]);96     }97     return;98 }

程序执行结果如下:

技术分享

总的来说,给条件变量发送信号的过程代码如下:

struct{    pthread_mutex_t    mutex;    pthread_cond_t       cond;    //维护本条件的各个变量}var = {PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER,...}pthread_mutex_lock(&var.mutex);设置条件为真pthread_cond_signal(&var.cond);pthread_mutex_unlock(&var.mutex);

测试条件并进入睡眠以等待条件变为真的代码大体如下:

pthread_mutex_lock(&var.mutex);while(条件为假)   pthread_cond_wait(&var.cond,&var.mutex);修改条件pthread_mutex_unlock(&var.mutex);

 2.2定时等待和广播

  通常pthread_cond_signal只是唤醒等待在相应条件变量上的一个线程,在某些情况下需要唤醒多个线程(例如读写者问题),可以调用pthread_cond_broadcast唤醒阻塞在相应条件变量上的所有线程。pthread_cond_timewait允许线程就阻塞时间设置一个限制值。API如下:

pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex, const struct timespec *abstime);

linux 线程的同步 二 (互斥锁和条件变量)