首页 > 代码库 > C++11中多线程库

C++11中多线程库

一、linux 线程同步

     线程是在操作系统层面支持的,所以多线程的学习建议还是先找一本linux系统编程类的书,了解linux提供多线程的API。完全完全使用系统调用编写多线程程序是痛苦,现在也有很多封装好的多线程库,但是了解多线程系统对学习编写多线程程序非常有好处。总的来说linux提供了四类系统用于多程序程序,分别线程的创建、销毁(thread),用于线程同步的(互斥量(mutex)、条件量(cond),信号量(sem))。

  • 互斥量通过锁的机制实现线程间的同步。互斥量是一种特殊的变量,可以对它进行加锁、解锁操作。通过互斥量可以保证同一时刻只有一个线程访问线程之间共享的资源。(互斥量对应的操作的加锁与解锁)
  • 条件变量的使用需要结合互斥量、条件变量、条件。线程查看条件时需要用互斥量加锁,当条件满足线程执行某种操作,条件不满足时,条件变量(wait)操作自动阻塞该线程。当有另外的线程修改了条件时,会激活阻塞的线程,阻塞线程重新评价条件。条件的检测必须在互斥所的保护下进行。条件变量对应的操作是wait,try_wait。
  • 互斥量只有锁和解锁两种状态,信号量可以理解为有多个状态的特殊的变量,有等待信号量(wait)和释放(release)两种操作,分别对应信号量减1和加1。

  参考:Linux 线程同步的三种方法

二、c++11线程同步

  c++11从语言层面支持多线程操作,当然本质上是对系统调用的封装,但是极大的方便了开发人员。

1、<thread>

     线程类thread,使用RAII 风格管理线程的创建和销毁。创建线程时传入线程要执行的代码段(函数、lamda表达式)和参数,thread析构函数会自动销毁线程。

2、<mutex>

a.操作系统提供mutex可以设置属性,c++11根据mutext的属性提供四种的互斥量,分别是

  • std::mutex,最常用,普遍的互斥量(默认属性), 
  • std::recursive_mutex ,允许同一线程使用recursive_mutext多次加锁,然后使用相同次数的解锁操作解锁。mutex多次加锁会造成死锁
  • std::timed_mutex,在mutex上增加了时间的属性。增加了两个成员函数try_lock_for(),try_lock_until(),分别接收一个时间范围,再给定的时间内如果互斥量被锁主了,线程阻塞,超过时间,返回false。
  • std::recursive_timed_mutex,增加递归和时间属性

b. mutex成员函数加锁解锁

  • lock(),互斥量加锁,如果互斥量已被加锁,线程阻塞
  • bool try_lock(),尝试加锁,如果互斥量未被加锁,则执行加锁操作,返回true;如果互斥量已被加锁,返回false,线程不阻塞。
  • void unlock(),解锁互斥量

c. mutex RAII式的加锁解锁

  • std::lock_guard,管理mutex的类。对象构建时传入mutex,会自动对mutex加入,直到离开类的作用域,析构时完成解锁。RAII式的栈对象能保证在异常情形下mutex可以在lock_guard对象析构被解锁。
  • std::unique_lock 与 lock_guard功能类似,但是比lock_guard的功能更强大。比如std::unique_lock维护了互斥量的状态,可通过bool owns_lock()访问,当locked时返回true,否则返回false

3、condition_variable

条件变量的使用要结合条件、互斥量、条件变量三者一起使用。线程在检测条件之前使用mutex加锁,满足某种条件时线程使用条件变量的wait操作进入阻塞状态。当其它的线程修改条件,激活该条件变量阻塞的线程,阻塞的线程的重新加锁检测条件。条件变量提供wait和notify两种操作。

 1 // condition_variable example
 2 #include <iostream>           // std::cout
 3 #include <thread>             // std::thread
 4 #include <mutex>              // std::mutex, std::unique_lock
 5 #include <condition_variable> // std::condition_variable
 6 
 7 std::mutex mtx;
 8 std::condition_variable cv;
 9 bool ready = false;
10 
11 void print_id (int id) {
12   std::unique_lock<std::mutex> lck(mtx);
13   while (!ready) cv.wait(lck);
14   // ...
15   std::cout << "thread " << id << \n;
16 }
17 
18 void go() {
19   std::unique_lock<std::mutex> lck(mtx);
20   ready = true;
21   cv.notify_all();
22 }
23 
24 int main ()
25 {
26   std::thread threads[10];
27   // spawn 10 threads:
28   for (int i=0; i<10; ++i)
29     threads[i] = std::thread(print_id,i);
30 
31   std::cout << "10 threads ready to race...\n";
32   go();                       // go!
33 
34   for (auto& th : threads) th.join();
35 
36   return 0;
37 }

4、信号量(CSemaphore)

C++11多线程库没有提供信号量的类,但是很容易通过条件变量、互斥量自己实现。

//信号量类
class CSemaphore {
private:
    std::condition_variable cv;
    std::mutex mutex;
    int value;
public:
    CSemaphore(int init) :
            value(init) {
    }

    void wait() {
        std::unique_lock<std::mutex> lock(mutex);
        while (value < 1) {
            cv.wait(lock);
        }
        value--;
    }

    bool try_wait() {
        std::unique_lock<std::mutex> lock(mutex);
        if (value < 1)
            return false;
        value--;
        return true;
    }

    void post() {
        {
            std::unique_lock<std::mutex> lock(mutex);
            value++;
        }
        cv.notify_one();
    }
};

5、原子操作<atomic>

针对多线程的共享数据的存储读写,多线程指令交叉可能造成未知的错误(undefine行为),需要限制并发程序以某种特定的顺序执行,除了前面介绍的互斥量加锁的操纵,还可以使用C++11中提供的原则操作(atomic)。原子操作使得某个线程对共享数据的操作要不一步完成,要不不做。

a. std::atomic_flag是一个bool原子类型有两个状态:set(flag=true) 和 clear(flag=false),必须被ATOMIC_FLAG_INIT初始化此时flag为clear状态,相当于静态初始化。一旦atomic_flag初始化后只有三个操作:test_and_set,clear,析构,均是原子化操作。atomic_flag::test_and_set检查flag是否被设置,若被设置直接返回true,若没有设置则设置flag为true后再返回false。atomic_clear()清楚flag标志即flag=false。不支持拷贝、赋值等操作,这和所有atomic类型一样,因为两个原子类型之间操作不能保证原子化。atomic_flag的可操作性不强导致其应用局限性,还不如atomic<bool>。

b.atomic<T>模板类。T必须满足trivially copy type。定义了拷贝/移动/赋值函数;没有虚成员;基类或其它任何非static成员都是trivally copyable。典型的内置类型bool、int等属于trivally copyable type。注意某些原子操作可能会失败,比如atomic<float>、atomic<double>,,没有原子算术操作针对浮点数。

atomic<T>特别针对整数和指针做了特化。整数包括har, signed char, unsigned char, short, unsigned short, int, unsigned int, long, unsigned long, long long, unsigned long long, char16_t, char32_t, wchar_t。由于在实际中,用得比较多的原子类型是整数,下面以整数原子类型介绍原子类型的操作函数

  • 构造函数。构造函数传入一个T类型的整数,初始化一个std::atomic对象,拷贝构造函数禁用。std::atomic <int> foo = 0;
  • std::atomic::operator=(T val),赋值操作函数。一个类型为T的变量可以赋值给相应的原子类型变量,相当于隐式转换,并且该操作是原子的。
  • std::atomic::is_lock_free,判断std:atomic对象是否具备lock-free特性,在多个线程范文该对象时不会导致线程阻塞。(可能使用某种事务内存transactional memory 方法实现 lock-free 的特性)。
  • store  修改被封装的值,sync指定内存序,默认为顺序一致性
  • load 和store相对应,读取被封装的值
  • exchange。读取并修改被封装的值,exchange 会将 val 指定的值替换掉之前该原子对象封装的值,并返回之前该原子对象封装的值,整个过程是原子的(因此exchange 操作也称为 read-modify-write 操作)。sync参数指定内存序(Memory Order)

针对整型特化,增加的一些操作函数:

  • fetch_add,将原子对象封装的值增加某个值,并返回原子对象的旧值
  • fetch_sub,将原子对象封装的值减少某个值,并返回原子对象的旧值
  • fetch_and,将原子对象封装的值与某个值相与,并返回原子对象的旧值
  • fetch_or
  • fetch_xor
  • 支持operator++,原子对象自增
  • 支持operator--,原子对象自减

6、future

参考:C++并发实战13:std::future、std::async、std::promise、std::packaged_task

多线程程序设计时,一方面要注意多线程共享变量的访问的安全性,另一方面有些异步任务之间会有结果的传递。C++11标准提供了几种异步任务处理机制。通常thread不能直接返回执行的结构(可以通过传递应用,指针),而在异步处理当中很多时候一个线程(privider)创建某个线程(executor)处理某个任务,provider在某个时候获取executor执行结果,如果executor没有完成任务,provider线程就会阻塞等待,直到executor线程完成任务,返回结果。

std::future可用于异步任务中获取任务结果,但是它只是获取结果而已,真正的异步调用需要配合std::async,std::packaged_task,std::promise。async是个模板函数,packaged_task和promise是模板类,通常模板实例化参数是任务函数。

a. aysnc函数+future 模式

 std::future<bool> fut = std::async (is_prime,313222313);

这里里async自动创建一个后台线程,执行任务is_prime函数,并将计算结果保存在myFuture中,这里future的模板参数要和任务task返回类型一致为bool.

b.packaged_task+future

std::packaged_task内部包含了两个最基本的元素。一、被包装的任务,任务是一个可调用的对象,函数对象、函数指针。二、共享状态(shared state),用于保存任务的返回值,使用std::future对象异步访问共享状态。

可以通过 std::packged_task::get_future 来获取与共享状态相关联的 std::future 对象。在调用该函数之后,两个对象共享相同的共享状态,具体解释如下:

  • std::packaged_task 对象是异步 Provider,它在某一时刻通过调用被包装的任务来设置共享状态的值。
  • std::future 对象是一个异步返回对象,通过它可以获得共享状态的值,当然在必要的时候需要等待共享状态标志变为 ready.

std::packaged_task 的共享状态的生命周期一直持续到最后一个与之相关联的对象被释放或者销毁为止。

具体实例参考:http://www.cplusplus.com/reference/future/packaged_task/

c.promise + future

aync和packaged_task,是provider线程获取executor线程的结果。promise是provider线程通过future对象项executor线程传递参数。

promise 对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中)。在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。

可以通过 get_future 来获取与该 promise 对象相关联的 future 对象,调用该函数之后,两个对象共享相同的共享状态(shared state)

  • promise 对象是异步 Provider,它可以在某一时刻设置共享状态的值。
  • future 对象可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为 ready,然后才能获取共享状态的值。
 1 #include <iostream>       // std::cout
 2 #include <functional>     // std::ref
 3 #include <thread>         // std::thread
 4 #include <future>         // std::promise, std::future
 5 
 6 void print_int(std::future<int>& fut) {
 7     int x = fut.get(); // 获取共享状态的值.
 8     std::cout << "value: " << x << \n; // 打印 value: 10.
 9 }
10 
11 int main ()
12 {
13     std::promise<int> prom; // 生成一个 std::promise<int> 对象.
14     std::future<int> fut = prom.get_future(); // 和 future 关联.
15     std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.
16     prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.
17     t.join();
18     return 0;
19 }

 

C++11中多线程库