首页 > 代码库 > Linux程序设计学习笔记----多线程编程线程同步机制之互斥量(锁)与读写锁

Linux程序设计学习笔记----多线程编程线程同步机制之互斥量(锁)与读写锁

互斥锁通信机制

基本原理

互斥锁以排他方式防止共享数据被并发访问,互斥锁是一个二元变量,状态为开(0)和关(1),将某个共享资源与某个互斥锁逻辑上绑定之后,对该资源的访问操作如下:

(1)在访问该资源之前需要首先申请互斥锁,如果锁处于开状态,则申请得到锁并立即上锁(关),防止其他进程访问资源,如果锁处于关,则默认阻塞等待.

(2)只有锁定该互斥锁的进程才能释放该互斥锁.

互斥量类型声明为pthread_mutex_t数据类型,在<bits/pthreadtypes.h>中有具体的定义。

互斥量初始化和销毁

在使用互斥锁之前需要定义互斥锁(全局变量),定义代码:

pthread_mutex lock;

/* Initialize a mutex.  */
int pthread_mutex_init (pthread_mutex_t *__mutex,\      // 指向要初始化的互斥锁的指针
                        __const pthread_mutexattr_t *__mutexattr); // 指向互斥锁属性对象的指针,设null为默认属性

/* Destroy a mutex.  */
int pthread_mutex_destroy (pthread_mutex_t *__mutex);

上面两个函数分别由于互斥量的初始化和销毁。

如果互斥量是静态分配的,可以通过常量进行初始化,如下:

#define PTHREAD_MUTEX_INITIALIZER {{0, }}  // 系统定义的,无需声明
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;

当然也可以通过pthread_mutex_init()进行初始化。对于动态分配的互斥量由于不能直接赋值进行初始化就只能采用这种方式进行初始化,pthread_mutex_init()的第二个参数是互斥量的属性,如果采用默认的属性设置,可以传入NULL。

当不在需要使用互斥量时,需要调用pthread_mutex_destroy()销毁互斥量所占用的资源。

互斥量的属性设置

/* 初始化互斥量属性对象 */
int pthread_mutexattr_init (pthread_mutexattr_t *__attr);

/* 销毁互斥量属性对象  */
int pthread_mutexattr_destroy (pthread_mutexattr_t *__attr);

/* 获取互斥量属性对象在进程间共享与否的标志 */
int pthread_mutexattr_getpshared (__const pthread_mutexattr_t *__restrict __attr,                                    int *__restrict __pshared);

/* 设置互斥量属性对象,标识在进程间共享与否 */
int pthread_mutexattr_setpshared (pthread_mutexattr_t *__attr, int __pshared);

互斥量在初始化的时候pthread_mutex_init的第二个参数是互斥量的属性,如果为NULL空指针,那么就使用默认属性。

互斥量属性的数据类型为pthread_mutexattr_t,它的初始化和销毁和互斥量类似。一旦互斥量属性对象被初始化后,就可以通过调用不同的函数启用或禁止特定的属性。这里列出了一个设置特定属性的函数:pthread_mutexattr_setpshared,可以用于指定互斥量在不同进程间共享,这样可以通过互斥量来同步不同的进程,当然前提是该互斥量位于进程间的共享内存区。

pthread_mutexattr_setpshared()函数的第二个参数__pshared用于设定是否进程间共享,其值可以是PTHREAD_PROCESS_PRIVATEPTHREAD_PROCESS_SHARED,后者是设置进程间共享。

下面是使互斥量可以在进程间共享的大致过程:

pthread_mutex_t *pSharedMutex;  //指向共享内存区的互斥量
pthread_mutexattr_t mutexAttr;  //互斥量属性

pSharedMutex = /*一个指向共享内存区的指针*/;

pthread_mutexattr_init(&mutexAttr);
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(pSharedMutex, &mutexAttr);

互斥量的申请使用

/* Try locking a mutex.  */
int pthread_mutex_trylock (pthread_mutex_t *__mutex);

/* Lock a mutex.  */
int pthread_mutex_lock (pthread_mutex_t *__mutex);

/* Unlock a mutex.  */
int pthread_mutex_unlock (pthread_mutex_t *__mutex);
这几个函数都很简单,通过pthread_mutex_lock()函数获得访问共享资源的权限,如果已经有其他线程锁住互斥量,那么该函数会是线程阻塞指定该互斥量解锁为止。 pthread_mutex_trylock()是对应的非阻塞函数,如果互斥量已被占用,它会返回一个EBUSY错误。访问完共享资源后,一定要通过pthread_mutex_unlock() 函数,释放占用的互斥量。允许其他线程访问该资源。

这里要强调的是:互斥量是用于上锁的,不能用于等待。

简单说就是,互斥量的使用流程应该是:线程占用互斥量,然后访问共享资源,最后释放互斥量。而不应该是:线程占用互斥量,然后判断资源是否可用,如果不可用,释放互斥量,然后重复上述过程。这种行为称为轮转或轮询,是一种浪费CPU时间的行为。

应用示例

下面的程序使用了两个同进程的线程,一个线程负责从标准设备读入数据存储在共享数据区,另一个负责将读入的数据输出到标准设备.其实是一个生产者消费者的变形.

  1. 处理输入操作的线程在接受用户信息的时候必须互斥使用该资源,不能被打断,阻塞其他线程的进行
  2. 输出线程也不能被打断,需要先锁定互斥锁,操作完成后释放,
  3. 程序结束,销毁互斥锁.

代码如下:

/*************************************************************************
> File Name: pthread_exp.c
> Author:SuooL 
> Mail:1020935219@qq.com || hu1020935219@gmail.com
> Website:http://blog.csdn.net/suool | http://suool.net
> Created Time: 2014年08月15日 星期三 08时42分45秒
> Description: 
************************************************************************/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>

void *thread_function(void *arg);

pthread_mutex_t work_mutex;     // 定义全局互斥锁对象

#define WORK_SIZE 1024          
char work_area[WORK_SIZE];      // 全局共享数据区
int time_to_exit = 0;           // 标志变量

// main函数
int main(int argc,char *argv[]) 
{
    int res;
    pthread_t a_thread;           // 线程
    void *thread_result;
    res = pthread_mutex_init(&work_mutex, NULL); //init mutex 初始化互斥锁
    if (res != 0) 
    {
        perror("Mutex initialization failed");
        exit(EXIT_FAILURE);
    }
    res = pthread_create(&a_thread, NULL, thread_function, NULL);//create new thread
    if (res != 0) 
    {
        perror("Thread creation failed");
        exit(EXIT_FAILURE);
    }
    pthread_mutex_lock(&work_mutex);			//lock the mutex,before get the input
    printf("Input some text. Enter 'end' to finish\n");
    while(!time_to_exit) 
    {
        fgets(work_area, WORK_SIZE, stdin);		//get a string from stdin
        pthread_mutex_unlock(&work_mutex);		//unlock the mutex
        while(1) 
        {
            pthread_mutex_lock(&work_mutex);	//lock the mutex
            if (work_area[0] != '\0')              // check out if the info input has been output
            {
                pthread_mutex_unlock(&work_mutex);	//unlock the mutex
                sleep(1);
            }
            else                                // if has been output, break and goto next iput
            {
                break;
            }
        }
    }
    pthread_mutex_unlock(&work_mutex);          // unlock the mutex 
    printf("\nWaiting for thread to finish...\n");
    res = pthread_join(a_thread, &thread_result);    // wait for the other pthread end
    if (res != 0) 
    {
        perror("Thread join failed");
        exit(EXIT_FAILURE);
    }
    printf("Thread joined\n");
    pthread_mutex_destroy(&work_mutex);         // destory the mutex
    exit(EXIT_SUCCESS);
}

// the pthread run function
void *thread_function(void *arg) 
{
    sleep(1);
    pthread_mutex_lock(&work_mutex);  // lock the mutex 抢占资源
    while(strncmp("end", work_area, 3) != 0)   // 判断是否为结束信息
    {
        printf("You input %d characters\n", strlen(work_area) -1); // 输出输入信息
        printf("the characters is %s",work_area);
        work_area[0] = '\0';         // 设置数据区,表示输入已被输出
        pthread_mutex_unlock(&work_mutex);   // 解锁
        sleep(1);
        pthread_mutex_lock(&work_mutex);     // 上锁
        while (work_area[0] == '\0' )        // 判断是否被输出
        {                 // 如果是\0则没有输入,等待主进程输入
         pthread_mutex_unlock(&work_mutex); // 解锁
         sleep(1);                            // 等待
         pthread_mutex_lock(&work_mutex);   // 上锁,再次检测
        }
    }
    time_to_exit = 1;             // 设置主线程退出信号
    work_area[0] = '\0';           // 标志输出
    pthread_mutex_unlock(&work_mutex);    // 世界所
    pthread_exit(0);           // 线程退出
}

运行结果:

读写锁通信机制

读写锁和互斥量(互斥锁)很类似,是另一种线程同步机制,但不属于POSIX标准,可以用来同步同一进程中的各个线程。当然如果一个读写锁存放在多个进程共享的某个内存区中,那么还可以用来进行进程间的同步.

因为对数据的读写应用中,很多情况都是大量的读操作,而较少有写操作,例如对数据库的访问等,显然这样使用互斥锁会很影响效率,为了满足这样的需求,POSIX线提供了读写锁机制.模式如下:

  1. 如果当前进程读数据,其他的进程也可以进行读操作,但不能写操作
  2. 如果当前进程写数据,则所有其它进程阻塞等待

和互斥量不同的是:互斥量会把试图进入已保护的临界区的线程都阻塞;然而读写锁会视当前进入临界区的线程和请求进入临界区的线程的属性来判断是否允许线程进入。

相对互斥量只有加锁和不加锁两种状态,读写锁有三种状态:读模式下的加锁,写模式下的加锁,不加锁

读写锁的使用规则:

  • 只要没有写模式下的加锁,任意线程都可以进行读模式下的加锁;
  • 只有读写锁处于不加锁状态时,才能进行写模式下的加锁;

读写锁也称为共享-独占(shared-exclusive)锁,当读写锁以读模式加锁时,它是以共享模式锁住,当以写模式加锁时,它是以独占模式锁住。读写锁非常适合读数据的频率远大于写数据的频率从的应用中。这样可以在任何时刻运行多个读线程并发的执行,给程序带来了更高的并发度。

定义代码如下:

pthread_rwlock_t ralock;

初始化和销毁读写锁

/* Initialize read-write lock  */
 int pthread_rwlock_init (pthread_rwlock_t *__restrict __rwlock,  // 指向要初始化的读写锁指针
                                __const pthread_rwlockattr_t *__restrict __attr); // 指向属性对象的指针

/* Destroy read-write lock */
extern int pthread_rwlock_destroy (pthread_rwlock_t *__rwlock); //返回值:成功返回0,否则返回错误代码

上面两个函数分别由于读写锁的初始化和销毁。和互斥量,条件变量一样,如果读写锁是静态分配的,可以通过常量进行初始化,如下:

<span style="font-size:12px;">pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;</span>

也可以通过pthread_rwlock_init()进行初始化。对于动态分配的读写锁由于不能直接赋值进行初始化,只能通过这种方式进行初始化。pthread_rwlock_init()第二个参数是读写锁的属性,如果采用默认属性,可以传入空指针NULL

那么当不在需要使用时及释放(自动或者手动)读写锁占用的内存之前,需要调用pthread_rwlock_destroy()进行销毁读写锁占用的资源。

读写锁的属性设置

/* 初始化读写锁属性对象 */
int pthread_rwlockattr_init (pthread_rwlockattr_t *__attr);

/* 销毁读写锁属性对象 */
int pthread_rwlockattr_destroy (pthread_rwlockattr_t *__attr);

/* 获取读写锁属性对象在进程间共享与否的标识*/
int pthread_rwlockattr_getpshared (__const pthread_rwlockattr_t * __restrict __attr,
                                          int *__restrict __pshared);

/* 设置读写锁属性对象,标识在进程间共享与否  */
int pthread_rwlockattr_setpshared (pthread_rwlockattr_t *__attr, int __pshared);

                                                    返回值:成功返回0,否则返回错误代码
这个属性设置和互斥量的基本一样,具体可以参考上面的互斥量的设置互斥量的属性设置

读写锁的申请使用

/* 读模式下加锁  */
int pthread_rwlock_rdlock (pthread_rwlock_t *__rwlock);

/* 非阻塞的读模式下加锁  */
int pthread_rwlock_tryrdlock (pthread_rwlock_t *__rwlock);

# ifdef __USE_XOPEN2K
/*  限时等待的读模式加锁 */
int pthread_rwlock_timedrdlock (pthread_rwlock_t *__restrict __rwlock,
                                       __const struct timespec *__restrict __abstime);
# endif

/* 写模式下加锁  */
int pthread_rwlock_wrlock (pthread_rwlock_t *__rwlock);

/* 非阻塞的写模式下加锁 */
int pthread_rwlock_trywrlock (pthread_rwlock_t *__rwlock);

# ifdef __USE_XOPEN2K
/* 限时等待的写模式加锁 */
int pthread_rwlock_timedwrlock (pthread_rwlock_t *__restrict __rwlock,
                                       __const struct timespec *__restrict __abstime);
# endif

/* 解锁 */
int pthread_rwlock_unlock (pthread_rwlock_t *__rwlock);

                                                   返回值:成功返回0,否则返回错误代码
(1)pthread_rwlock_rdlock()系列函数

pthread_rwlock_rdlock()用于以读模式即共享模式获取读写锁,如果读写锁已经被某个线程以写模式占用,那么调用线程就被阻塞。在实现读写锁的时候可以对共享模式下锁的数量进行限制(目前不知如何限制)。

pthread_rwlock_tryrdlock()和pthread_rwlock_rdlock()的唯一区别就是,在无法获取读写锁的时候,调用线程不会阻塞,会立即返回,并返回错误代码EBUSY。

pthread_rwlock_timedrdlock()是限时等待读模式加锁,时间参数struct timespec * __restrict __abstime也是绝对时间,和条件变量的pthread_cond_timedwait()使用基本一致,具体可以参考pthread_cond_timedwait()

(2)pthread_rwlock_wrlock()系列函数

pthread_rwlock_wrlock()用于写模式即独占模式获取读写锁,如果读写锁已经被其他线程占用,不论是以共享模式还是独占模式占用,调用线程都会进入阻塞状态。

pthread_rwlock_trywrlock()在无法获取读写锁的时候,调用线程不会进入睡眠,会立即返回,并返回错误代码EBUSY。

pthread_rwlock_timedwrlock()是限时等待写模式加锁,也和条件变量的pthread_cond_timedwait()使用基本一致,具体可以参考pthread_cond_timedwait()

(3)pthread_rwlock_unlock()

无论以共享模式还是独占模式获得的读写锁,都可以通过调用pthread_rwlock_unlock()函数进行释放该读写锁。

读写锁应用

下面的程序使用读写锁实现4个线程读写一段数据.

其中两个线程读数据,两个线程写数据.

从结果看出,任意进程写数据的时候,将阻塞所有其他进程的操作,但在某一进程读数据的时候,其他进程仍然可以读得数据.

编译运行结果如下:


代码如下:

/*************************************************************************
> File Name: pthread_rwlock_exp.c
> Author:SuooL 
> Mail:1020935219@qq.com || hu1020935219@gmail.com
> Website:http://blog.csdn.net/suool | http://suool.net
> Created Time: 2014年08月15日 星期三 09时20分45秒
> Description: 
************************************************************************/
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <bits/pthreadtypes.h>

static pthread_rwlock_t rwlock;     // 读写锁对象

#define WORK_SIZE 1024
char work_area[WORK_SIZE];          // 全局共享数据区
int time_to_exit;                   // 标志变量

// 子线程执行的函数声明
void *thread_function_read_o(void *arg);
void *thread_function_read_t(void *arg);
void *thread_function_write_o(void *arg);
void *thread_function_write_t(void *arg);

// main 函数
int main(int argc,char *argv[]) 
{
    int res;
    pthread_t a_thread,b_thread,c_thread,d_thread;  // 线程定义
    void *thread_result;            // 指针定义

    res=pthread_rwlock_init(&rwlock,NULL);         // 读写锁定义
    if (res != 0) 
    {
        perror("rwlock initialization failed");
        exit(EXIT_FAILURE);
    }
    // 创建子线程
    res = pthread_create(&a_thread, NULL, thread_function_read_o, NULL);//create new thread
    if (res != 0) 
    {
        perror("Thread creation failed");
        exit(EXIT_FAILURE);
    }

    res = pthread_create(&b_thread, NULL, thread_function_read_t, NULL);//create new thread
    if (res != 0) 
    {
        perror("Thread creation failed");
        exit(EXIT_FAILURE);
    }
    res = pthread_create(&c_thread, NULL, thread_function_write_o, NULL);//create new thread
    if (res != 0)
    {
        perror("Thread creation failed");
        exit(EXIT_FAILURE);
    }
    res = pthread_create(&d_thread, NULL, thread_function_write_t, NULL);//create new thread
    if (res != 0)
    {
        perror("Thread creation failed");
        exit(EXIT_FAILURE);
    }
    // 线程等待
    res = pthread_join(a_thread, &thread_result);			
    if (res != 0) 
    {
        perror("Thread join failed");
        exit(EXIT_FAILURE);
    }
    res = pthread_join(b_thread, &thread_result);			
    if (res != 0) 
    {
        perror("Thread join failed");
        exit(EXIT_FAILURE);
    }
    res = pthread_join(c_thread, &thread_result);			
if (res != 0) 
{
    perror("Thread join failed");
    exit(EXIT_FAILURE);
}
res = pthread_join(d_thread, &thread_result);			
if (res != 0) 
{
    perror("Thread join failed");
    exit(EXIT_FAILURE);
}
// 读写锁销毁
pthread_rwlock_destroy(&rwlock);				
exit(EXIT_SUCCESS);
}

void *thread_function_read_o(void *arg)            // 读线程one
{
    printf("thread read one try to get lock\n");	

    pthread_rwlock_rdlock(&rwlock);          // 获取读写锁
    while(strncmp("end", work_area, 3) != 0)    // 判断是否为输入结束符
    {
        printf("this is thread read one.\n");   // 输出信息
        printf("the characters is %s",work_area);	
        pthread_rwlock_unlock(&rwlock);		// 解锁	
        sleep(2);
        pthread_rwlock_rdlock(&rwlock);			// 获取所
        while (work_area[0] == '\0' ) 		  // 查看数据区开头是否为\0
        {
            pthread_rwlock_unlock(&rwlock);	 // 解锁
            sleep(2);                        // 等待
            pthread_rwlock_rdlock(&rwlock);  // 获取锁,再次查看
        }
    }	
    pthread_rwlock_unlock(&rwlock);	   // 解锁
    time_to_exit=1;  // 设置退出信号
    pthread_exit(0);  // 线程退出
}

 void *thread_function_read_t(void *arg)              // 读线程two,同one
 {
     printf("thread read one try to get lock\n");
     pthread_rwlock_rdlock(&rwlock);
     while(strncmp("end", work_area, 3) != 0) 
     {
         printf("this is thread read two.\n");
         printf("the characters is %s\n",work_area);	
         pthread_rwlock_unlock(&rwlock);			
         sleep(5);
         pthread_rwlock_rdlock(&rwlock);			
         while (work_area[0] == '\0' ) 		 
         {				
             pthread_rwlock_unlock(&rwlock);	
             sleep(5);
             pthread_rwlock_rdlock(&rwlock);	
         }
     }
     pthread_rwlock_unlock(&rwlock);	
     time_to_exit=1;
     pthread_exit(0);
 }

void *thread_function_write_o(void *arg)        // 写线程one
{
    printf("this is write thread one try to get lock\n"); // 输出提示信息
    while(!time_to_exit)               // 是否退出
    {
        pthread_rwlock_wrlock(&rwlock);       // 获取读写锁
        printf("this is write thread one.\nInput some text. Enter 'end' to finish\n");
        fgets(work_area, WORK_SIZE, stdin); // 获取标准输入写到是数据区
        pthread_rwlock_unlock(&rwlock);    // 解锁
        sleep(15); // 等待
    }
    pthread_rwlock_unlock(&rwlock);       // 解锁
    pthread_exit(0);               // 线程退出
}

void *thread_function_write_t(void *arg)  // 写线程two,同one
{
    sleep(10);
    while(!time_to_exit)
    {
        pthread_rwlock_wrlock(&rwlock);
        printf("this is write thread two.\nInput some text. Enter 'end' to finish\n"); 
        fgets(work_area, WORK_SIZE, stdin);
        pthread_rwlock_unlock(&rwlock);
        sleep(20);
    }
    pthread_rwlock_unlock(&rwlock);
    pthread_exit(0);
}