首页 > 代码库 > 使用Condition Variables 实现一个线程安全队列
使用Condition Variables 实现一个线程安全队列
使用Condition Variables 实现一个线程安全队列
Basic Thread Safety with a Mutex 使用mutex实现简单的线程安全
template<typename Data>class concurrent_queue{private: std::queue<Data> the_queue; mutable boost::mutex the_mutex;public: void push(const Data& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); } bool empty() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.empty(); } Data& front() { boost::mutex::scoped_lock lock(the_mutex); return the_queue.front(); } Data const& front() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.front(); } void pop() { boost::mutex::scoped_lock lock(the_mutex); the_queue.pop(); }};
front
和pop会互相竞争。
但是对于一个消费者的系统就不是问题了。 假如队列是空的话多个线程有可能无事可做进入一个等待循环:while(some_queue.empty()) { boost::this_thread::sleep(boost::posix_time::milliseconds(50)); }
尽管sleep相较于忙等待避免了大量cpu资源的浪费,这个设计还是有些不足。首先线程必须每隔50ms(或者其他间隔)唤醒一次用来锁定mutex、检查队列、解锁mutex、强制上下文切换。 其次,睡眠的间隔时间相当于强加了一个限制给响应时间:数据被加到队列后到线程响应的响应时间。— 0ms到50ms都有可能,平均是25ms。
使用Condition Variable等待
concurrent_queue里实现了个成员方法
:template<typename Data>class concurrent_queue{private: boost::condition_variable the_condition_variable;public: void wait_for_data() { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } } void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); bool const was_empty=the_queue.empty(); the_queue.push(data); if(was_empty) { the_condition_variable.notify_one(); } } // rest as before};
当你执行唤醒操作的时候要小心
template<typename Data>class concurrent_queue{public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); bool const was_empty=the_queue.empty(); the_queue.push(data); lock.unlock(); // unlock the mutex if(was_empty) { the_condition_variable.notify_one(); } } // rest as before};
减少锁的开销
wait_for_data
, front
以及pop
全都要锁mutex,消费者还是会快速交替调用锁操作。 吧wait和pop整合为一个操作可以减少加锁解锁操作:template<typename Data>class concurrent_queue{public: void wait_and_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } popped_value=http://www.mamicode.com/the_queue.front();>使用引用参数而不是函数返回值来传递结果是为了避免抛出异常引发的安全问题,使用返回值的话如果拷贝构造函数有异常抛出,那么数据被移除了但是也丢失了。然而使用这种方式就不会(参见 Herb Sutter‘s Guru Of The Week #8 的讨论)有时需要boost::optional来避免神马问题,NND没太看懂是啥。
This does, of course, require that an instanceData
can be created by the calling code in order to receive the result, which is not always the case. In those cases, it might be worth using something likeboost::optional
to avoid this requirement.Handling multiple consumers处理多个消费者
wait_and_pop
不仅移掉了锁的间接开销还带来了额外的好处。 — 现在自动允许多个消费者了。然而没有外部锁,把方法分成多个天生有细粒度的本质,这使他们会遭受竞态条件,现在吧方法结合起来就能安全的处理并发请求了。 (one reason why the authors of the SGI STL advocate against making things likestd::vector
thread-safe — 你需要外部锁去做许多共同的工作,让内部锁变得浪费资源), the combined function safely handles concurrent calls.If multiple threads are popping entries from a full queue, then they just get serialized inside
wait_and_pop
, and everything works fine. If the queue is empty, then each thread in turn will block waiting on the condition variable. When a new entry is added to the queue, one of the threads will wake and take the value, whilst the others keep blocking. If more than one thread wakes (e.g. with a spurious wake-up), or a new thread callswait_and_pop
concurrently, thewhile
loop ensures that only one thread will do thepop
, and the others will wait.Update: As commenter David notes below, using multiple consumers does have one problem: if there are several threads waiting when data is added, only one is woken. Though this is exactly what you want if only one item is pushed onto the queue, if multiple items are pushed then it would be desirable if more than one thread could wake. There are two solutions to this: use
notify_all()
instead ofnotify_one()
when waking threads, or to callnotify_one()
whenever any data is added to the queue, even if the queue is not currently empty. If all threads are notified then the extra threads will see it as a spurious wake and resume waiting if there isn‘t enough data for them. If we notify with everypush()
then only the right number of threads are woken. This is my preferred option: condition variable notify calls are pretty light-weight when there are no threads waiting. The revised code looks like this:template<typename Data>class concurrent_queue{public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); lock.unlock(); the_condition_variable.notify_one(); } // rest as before};There is one benefit that the separate functions give over the combined one — the ability to check for an empty queue, and do something else if the queue is empty.
empty
itself still works in the presence of multiple consumers, but the value that it returns is transitory — there is no guarantee that it will still apply by the time a thread callswait_and_pop
, whether it wastrue
orfalse
. For this reason it is worth adding an additional function:try_pop
, which returnstrue
if there was a value to retrieve (in which case it retrieves it), orfalse
to indicate that the queue was empty.template<typename Data>class concurrent_queue{public: bool try_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); if(the_queue.empty()) { return false; } popped_value=http://www.mamicode.com/the_queue.front();>通过移除front
andpop
方法,我们这个简单而又单纯的实现,现在已经变成了一个可用的多生产者多消费者队列。最终方案
多生产者多消费者队列的最终方案:
template<typename Data>class concurrent_queue{private: std::queue<Data> the_queue; mutable boost::mutex the_mutex; boost::condition_variable the_condition_variable;public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); lock.unlock(); the_condition_variable.notify_one(); } bool empty() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.empty(); } bool try_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); if(the_queue.empty()) { return false; } popped_value=http://www.mamicode.com/the_queue.front();>Posted by Anthony Williams
使用Condition Variables 实现一个线程安全队列