在C++多线程编程中,条件变量(std::condition_variable)是线程同步的重要工具之一。它允许线程在某个条件不满足时主动进入等待状态,直到其他线程通知条件可能已经改变。这种机制从根本上解决了"忙等待"(busy-waiting)带来的CPU资源浪费问题。
想象一个典型的生产者-消费者场景:消费者线程需要从队列中取出数据,但当队列为空时,它不应该不断轮询检查队列状态(这会浪费CPU周期),而是应该进入休眠状态,直到有数据可用。条件变量正是为解决这类问题而设计的。
条件变量的核心价值体现在三个方面:
条件变量通常与互斥锁(std::mutex)和共享状态变量配合使用,形成以下标准模式:
等待方(消费者):
通知方(生产者):
这种模式确保了线程安全的同时,也保证了高效的线程协作。
cpp复制void wait(std::unique_lock<std::mutex>& lock);
这是最基本的等待形式,它执行以下操作:
注意:使用基本wait时必须处理虚假唤醒问题,通常需要配合循环检查条件
cpp复制template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
这是推荐的使用方式,它内部已经处理了虚假唤醒问题。谓词pred应该是一个返回bool的可调用对象,在持有锁的情况下检查条件是否满足。
内部实现等价于:
cpp复制while(!pred()) {
wait(lock);
}
cpp复制void notify_one() noexcept;
唤醒一个正在等待的线程(如果有)。被唤醒的线程是不确定的,由系统调度决定。
cpp复制void notify_all() noexcept;
唤醒所有正在等待的线程。当多个线程可能同时满足条件时使用。
cpp复制template<class Rep, class Period>
std::cv_status wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time);
template<class Rep, class Period, class Predicate>
bool wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time,
Predicate pred);
在相对时间内等待条件满足。带谓词的版本返回bool表示谓词最终是否满足。
cpp复制template<class Clock, class Duration>
std::cv_status wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<Clock, Duration>& abs_time);
template<class Clock, class Duration, class Predicate>
bool wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<Clock, Duration>& abs_time,
Predicate pred);
与wait_for类似,但使用绝对时间点作为超时条件。
让我们通过一个完整示例展示条件变量的典型用法:
cpp复制#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
void producer() {
for(int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
cv.notify_one();
std::cout << "Produced: " << i << std::endl;
}
}
void consumer() {
while(true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return !data_queue.empty(); });
int data = data_queue.front();
data_queue.pop();
lock.unlock();
std::cout << "Consumed: " << data << std::endl;
if(data == 9) break;
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
当有多个消费者时,需要注意:
虚假唤醒是指等待的线程在没有收到任何通知的情况下被唤醒。这是出于性能考虑的设计特性,而非bug。处理虚假唤醒的方法:
cpp复制// 正确方式
cv.wait(lock, []{ return condition; });
// 等价的手动处理方式
while(!condition) {
cv.wait(lock);
}
在线程池实现中,条件变量用于工作线程等待任务到达:
cpp复制class ThreadPool {
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
public:
ThreadPool(size_t threads) {
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if(stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
std::lock_guard<std::mutex> lock(queue_mutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
~ThreadPool() {
{
std::lock_guard<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(auto& worker : workers) {
worker.join();
}
}
};
更复杂的使用场景,多个生产者和消费者通过有限缓冲区通信:
cpp复制class BoundedBuffer {
std::vector<int> buffer;
size_t capacity;
size_t front = 0;
size_t rear = 0;
size_t count = 0;
std::mutex mtx;
std::condition_variable not_full;
std::condition_variable not_empty;
public:
BoundedBuffer(size_t size) : capacity(size), buffer(size) {}
void produce(int item) {
std::unique_lock<std::mutex> lock(mtx);
not_full.wait(lock, [this] { return count < capacity; });
buffer[rear] = item;
rear = (rear + 1) % capacity;
++count;
lock.unlock();
not_empty.notify_one();
}
int consume() {
std::unique_lock<std::mutex> lock(mtx);
not_empty.wait(lock, [this] { return count > 0; });
int item = buffer[front];
front = (front + 1) % capacity;
--count;
lock.unlock();
not_full.notify_one();
return item;
}
};
条件变量:
自旋锁:
条件变量:
信号量:
条件变量:
future/promise:
虽然std::condition_variable在C++标准中定义,但在不同平台上实现可能有差异:
Windows:
Linux:
通用建议:
使用双重检查锁定:
cpp复制if(condition) {
std::lock_guard<std::mutex> lock(mtx);
if(condition) {
// 处理
}
}
减小临界区范围:
延迟通知:
条件通知:
超时等待:
优先级等待:
死锁:
活锁:
资源耗尽:
日志记录:
工具支持:
防御性编程:
C++20引入了一些与条件变量相关的新特性:
std::atomic::wait:
std::counting_semaphore:
std::latch和std::barrier:
尽管有这些新工具,条件变量仍然是复杂同步场景的首选方案。