首页 > 代码库 > 使用Condition Variables 实现一个线程安全队列

使用Condition Variables 实现一个线程安全队列

使用Condition Variables 实现一个线程安全队列

多线程代码需要面对的一个问题和是如何把数据从一个县城传到另一个县城。 举个栗子,一个常见的是把串行算法并行化方法是,把他们分成块并且做成一个管道。管道中任意一块都可以单独在一个线程里运行。每个阶段完成后添加数据到输入队列给下个阶段。

Basic Thread Safety with a Mutex 使用mutex实现简单的线程安全

template<typename Data>class concurrent_queue{private:    std::queue<Data> the_queue;    mutable boost::mutex the_mutex;public:    void push(const Data& data)    {        boost::mutex::scoped_lock lock(the_mutex);        the_queue.push(data);    }    bool empty() const    {        boost::mutex::scoped_lock lock(the_mutex);        return the_queue.empty();    }    Data& front()    {        boost::mutex::scoped_lock lock(the_mutex);        return the_queue.front();    }        Data const& front() const    {        boost::mutex::scoped_lock lock(the_mutex);        return the_queue.front();    }    void pop()    {        boost::mutex::scoped_lock lock(the_mutex);        the_queue.pop();    }};


如果一个以上县城从队列中取数据,当前的设计会受制于竞态条件,empty, front 和pop会互相竞争。 但是对于一个消费者的系统就不是问题了。  假如队列是空的话多个线程有可能无事可做进入一个等待循环:
    while(some_queue.empty())    {        boost::this_thread::sleep(boost::posix_time::milliseconds(50));    }

 尽管sleep相较于忙等待避免了大量cpu资源的浪费,这个设计还是有些不足。首先线程必须每隔50ms(或者其他间隔)唤醒一次用来锁定mutex、检查队列、解锁mutex、强制上下文切换。    其次,睡眠的间隔时间相当于强加了一个限制给响应时间:数据被加到队列后到线程响应的响应时间。— 0ms到50ms都有可能,平均是25ms。

使用Condition Variable等待

不停轮询方案的一个替代方案是使用Condition Variable等待。  当数据被加到空队列之后Condition Variable会被通知,  然后等待的线程被唤醒。这需要mutex来保护队列。
我们在 concurrent_queue里实现了个成员方法:
template<typename Data>class concurrent_queue{private:    boost::condition_variable the_condition_variable;public:    void wait_for_data()    {        boost::mutex::scoped_lock lock(the_mutex);        while(the_queue.empty())        {            the_condition_variable.wait(lock);        }    }    void push(Data const& data)    {        boost::mutex::scoped_lock lock(the_mutex);        bool const was_empty=the_queue.empty();        the_queue.push(data);        if(was_empty)        {            the_condition_variable.notify_one();        }    }    // rest as before};
首先,lock变量被当作参数传到了信号灯的wait方法。 这使得信号灯的实现是可以自动的解锁mutex并把消费者线程加到等待队列。当第一个县城等待时另一个线程可以更新被保护的数据。
其次, condition variable 等待在一个循环里面,可能遭遇假冒唤醒。所以在wait返回时检查实际状态非常重要。


template<typename Data>class concurrent_queue{public:    void push(Data const& data)    {        boost::mutex::scoped_lock lock(the_mutex);        bool const was_empty=the_queue.empty();        the_queue.push(data);        lock.unlock(); // unlock the mutex        if(was_empty)        {            the_condition_variable.notify_one();        }    }    // rest as before};


尽管信号灯改善了生产者消费者的性能,但是对于消费者来说执行锁的操作还是过多。wait_for_datafront 以及pop 全都要锁mutex,消费者还是会快速交替调用锁操作。 吧wait和pop整合为一个操作可以减少加锁解锁操作:
template<typename Data>class concurrent_queue{public:    void wait_and_pop(Data& popped_value)    {        boost::mutex::scoped_lock lock(the_mutex);        while(the_queue.empty())        {            the_condition_variable.wait(lock);        }                popped_value=http://www.mamicode.com/the_queue.front();>
使用引用参数而不是函数返回值来传递结果是为了避免抛出异常引发的安全问题,使用返回值的话如果拷贝构造函数有异常抛出,那么数据被移除了但是也丢失了。然而使用这种方式就不会(参见 Herb Sutter‘s Guru Of The Week #8 的讨论)有时需要 boost::optional来避免神马问题,NND没太看懂是啥。This does, of course, require that an instance Data can be created by the calling code in order to receive the result, which is not always the case. In those cases, it might be worth using something like boost::optional to avoid this requirement.

Handling multiple consumers处理多个消费者

wait_and_pop 不仅移掉了锁的间接开销还带来了额外的好处。  — 现在自动允许多个消费者了。
然而没有外部锁,把方法分成多个天生有细粒度的本质,这使他们会遭受竞态条件,现在吧方法结合起来就能安全的处理并发请求了。  (one reason why the authors of the SGI STL advocate against making things like std::vector thread-safe — 你需要外部锁去做许多共同的工作,让内部锁变得浪费资源), the combined function safely handles concurrent calls.

If multiple threads are popping entries from a full queue, then they just get serialized inside wait_and_pop, and everything works fine. If the queue is empty, then each thread in turn will block waiting on the condition variable. When a new entry is added to the queue, one of the threads will wake and take the value, whilst the others keep blocking. If more than one thread wakes (e.g. with a spurious wake-up), or a new thread calls wait_and_pop concurrently, thewhile loop ensures that only one thread will do the pop, and the others will wait.

Update: As commenter David notes below, using multiple consumers does have one problem: if there are several threads waiting when data is added, only one is woken. Though this is exactly what you want if only one item is pushed onto the queue, if multiple items are pushed then it would be desirable if more than one thread could wake. There are two solutions to this: use notify_all() instead of notify_one() when waking threads, or to callnotify_one() whenever any data is added to the queue, even if the queue is not currently empty. If all threads are notified then the extra threads will see it as a spurious wake and resume waiting if there isn‘t enough data for them. If we notify with every push() then only the right number of threads are woken. This is my preferred option: condition variable notify calls are pretty light-weight when there are no threads waiting. The revised code looks like this:

template<typename Data>class concurrent_queue{public:    void push(Data const& data)    {        boost::mutex::scoped_lock lock(the_mutex);        the_queue.push(data);        lock.unlock();        the_condition_variable.notify_one();    }    // rest as before};

There is one benefit that the separate functions give over the combined one — the ability to check for an empty queue, and do something else if the queue is empty. empty itself still works in the presence of multiple consumers, but the value that it returns is transitory — there is no guarantee that it will still apply by the time a thread calls wait_and_pop, whether it was true or false. For this reason it is worth adding an additional function: try_pop, which returnstrue if there was a value to retrieve (in which case it retrieves it), or false to indicate that the queue was empty.

template<typename Data>class concurrent_queue{public:    bool try_pop(Data& popped_value)    {        boost::mutex::scoped_lock lock(the_mutex);        if(the_queue.empty())        {            return false;        }                popped_value=http://www.mamicode.com/the_queue.front();>
通过移除front and pop 方法,我们这个简单而又单纯的实现,现在已经变成了一个可用的多生产者多消费者队列。



template<typename Data>class concurrent_queue{private:    std::queue<Data> the_queue;    mutable boost::mutex the_mutex;    boost::condition_variable the_condition_variable;public:    void push(Data const& data)    {        boost::mutex::scoped_lock lock(the_mutex);        the_queue.push(data);        lock.unlock();        the_condition_variable.notify_one();    }    bool empty() const    {        boost::mutex::scoped_lock lock(the_mutex);        return the_queue.empty();    }    bool try_pop(Data& popped_value)    {        boost::mutex::scoped_lock lock(the_mutex);        if(the_queue.empty())        {            return false;        }                popped_value=http://www.mamicode.com/the_queue.front();>

Posted by Anthony Williams

使用Condition Variables 实现一个线程安全队列