条件变量(condition_variable)是C++标准库中用于线程间同步的重要工具,它与互斥锁(mutex)配合使用,可以高效地实现线程间的等待/通知机制。在实际开发中,条件变量最常见的应用场景就是生产者-消费者模式。
在多线程编程中,我们经常会遇到这样的场景:一个线程需要等待某个条件成立才能继续执行。如果使用简单的忙等待(busy-waiting)方式:
cpp复制while (!condition) {
// 空循环等待
}
这种方式会持续占用CPU资源,效率极低。条件变量的出现就是为了解决这个问题,它允许线程在条件不满足时主动释放CPU资源,进入休眠状态,直到其他线程通知条件可能已经改变。
条件变量总是与互斥锁配合使用,这是因为:
这种设计模式被称为"监视器模式"(Monitor Pattern),是并发编程中的经典范式。
在生产者-消费者模式中,我们需要以下几个关键组件:
cpp复制std::mutex m; // 保护共享数据的互斥锁
std::condition_variable cv; // 用于线程间通信的条件变量
std::queue<int> q; // 共享数据队列
bool done = false; // 生产结束标志
生产者线程的主要职责是生成数据并放入队列,同时通知消费者线程:
cpp复制void producer() {
for (int i = 0; i < 10; i++) {
// 模拟生产耗时
std::this_thread::sleep_for(std::chrono::milliseconds(30));
{
std::lock_guard<std::mutex> lk(m); // 加锁保护队列
q.push(i); // 生产数据
} // 自动解锁
cv.notify_one(); // 通知一个消费者
}
{
std::lock_guard<std::mutex> lk(m); // 加锁保护done标志
done = true; // 设置生产结束标志
} // 自动解锁
cv.notify_all(); // 通知所有消费者
}
关键点说明:
lock_guard自动管理锁的生命周期notify_one()通知一个消费者notify_all()确保所有消费者都能收到通知消费者线程需要等待数据可用,然后处理数据:
cpp复制void consumer() {
std::unique_lock<std::mutex> lk(m); // 初始加锁
while (true) {
// 等待条件满足:队列不为空或生产结束
cv.wait(lk, [&] {
return !q.empty() || done;
});
// 处理队列中的所有数据
while (!q.empty()) {
int v = q.front();
q.pop();
lk.unlock(); // 处理数据时不需要持有锁
// 模拟数据处理耗时
std::this_thread::sleep_for(std::chrono::milliseconds(20));
lk.lock(); // 再次加锁以处理队列
}
// 检查是否应该退出
if (done) {
break;
}
}
}
关键点说明:
unique_lock而不是lock_guard,因为需要手动解锁wait()会自动释放锁并在唤醒后重新获取锁理解wait()的内部机制对正确使用条件变量至关重要:
cpp复制template <typename Predicate>
void wait(std::unique_lock<std::mutex>& lk, Predicate pred) {
while (!pred()) { // 检查条件
lk.unlock(); // 条件不满足,释放锁
// 进入等待状态,直到被notify
lk.lock(); // 被唤醒后重新获取锁
}
}
这种实现确保了:
虚假唤醒(spurious wakeup)是指线程在没有收到明确通知的情况下从wait()返回。这是操作系统调度的一种正常现象,因此我们必须:
wait()版本wait()返回就意味着条件一定满足示例中的[&] { return !q.empty() || done; }谓词就正确处理了虚假唤醒问题。
为了最大化并发性能,我们应该:
在消费者实现中,我们特意在处理数据时释放了锁:
cpp复制lk.unlock(); // 处理数据时不需要持有锁
// 耗时操作...
lk.lock(); // 再次加锁
notify_one():唤醒一个等待线程,适用于单消费者场景notify_all():唤醒所有等待线程,适用于多消费者场景在我们的例子中,生产结束时使用notify_all()确保所有消费者都能及时退出。
如果生产者和消费者优先级不同,可能会出现优先级反转问题。解决方案包括:
lock_guard/unique_lock)notifyCONDITION_VARIABLE与POSIX的pthread_cond_t行为略有不同扩展我们的示例,支持多个生产者和消费者:
cpp复制void multi_producer(int id) {
for (int i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(30));
{
std::lock_guard<std::mutex> lk(m);
q.push(id * 100 + i); // 用ID区分不同生产者的数据
}
cv.notify_all(); // 通知所有消费者
}
// 需要更复杂的完成检测机制
}
void multi_consumer(int id) {
std::unique_lock<std::mutex> lk(m);
while (/* 更复杂的退出条件 */) {
cv.wait(lk, [&] { return !q.empty() || /* 完成条件 */; });
// 处理数据...
}
}
有时我们不希望无限期等待,可以添加超时机制:
cpp复制void consumer_with_timeout() {
std::unique_lock<std::mutex> lk(m);
while (!done) {
// 等待最多100ms
if (cv.wait_for(lk, std::chrono::milliseconds(100),
[&] { return !q.empty() || done; })) {
// 条件满足,处理数据
while (!q.empty()) {
// ...
}
} else {
// 超时处理
std::cout << "Timeout occurred\n";
}
}
}
在某些高性能场景,可以结合原子操作减少锁竞争:
cpp复制std::atomic<bool> data_ready{false};
void optimized_producer() {
// 生产数据...
data_ready.store(true, std::memory_order_release);
cv.notify_one();
}
void optimized_consumer() {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&] { return data_ready.load(std::memory_order_acquire); });
// 处理数据...
}
在生产代码中,完善的日志记录对调试并发问题至关重要。我们可以改进示例中的日志功能:
cpp复制class ThreadLogger {
public:
static void log(const std::string& message) {
static std::mutex log_mutex;
std::lock_guard<std::mutex> lock(log_mutex);
auto now = std::chrono::system_clock::now();
auto tid = std::this_thread::get_id();
std::time_t time = std::chrono::system_clock::to_time_t(now);
std::cout << std::put_time(std::localtime(&time), "%F %T")
<< " [" << tid << "] " << message << std::endl;
}
};
在真实项目中,我曾遇到条件变量性能问题,通过以下步骤解决:
健壮的生产代码需要考虑各种异常情况:
cpp复制void robust_consumer() {
try {
std::unique_lock<std::mutex> lk(m);
while (!done) {
if (cv.wait_for(lk, 100ms, [&] { return !q.empty() || done; })) {
// 处理数据...
} else {
// 超时处理...
}
}
} catch (const std::exception& e) {
ThreadLogger::log(std::string("Consumer error: ") + e.what());
}
}
对于简单的单向通知,可以考虑使用promise/future:
cpp复制std::promise<void> ready_promise;
std::future<void> ready_future = ready_promise.get_future();
void simple_producer() {
// 准备工作...
ready_promise.set_value(); // 通知准备就绪
}
void simple_consumer() {
ready_future.wait(); // 等待准备就绪
// 继续执行...
}
对于计算密集型任务,可以使用更高层次的抽象:
cpp复制auto future = std::async(std::launch::async, [] {
// 异步任务...
return result;
});
// 主线程可以继续其他工作...
auto result = future.get(); // 需要时获取结果
对于更复杂的并发需求,可以考虑:
为并发代码编写有效的测试很有挑战性,但必不可少:
cpp复制TEST(ProducerConsumerTest, BasicFlow) {
std::mutex m;
std::condition_variable cv;
std::queue<int> q;
bool done = false;
auto producer = [&] {
std::lock_guard<std::mutex> lk(m);
q.push(42);
done = true;
cv.notify_one();
};
auto consumer = [&] {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&] { return done; });
ASSERT_FALSE(q.empty());
EXPECT_EQ(q.front(), 42);
};
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
}
模拟高负载场景:
使用工具如:
利用条件变量实现线程安全的观察者模式:
cpp复制class Subject {
std::mutex m;
std::condition_variable cv;
std::vector<std::function<void()>> observers;
bool changed = false;
public:
void register_observer(std::function<void()> observer) {
std::lock_guard<std::mutex> lk(m);
observers.push_back(observer);
}
void notify_observers() {
std::unique_lock<std::mutex> lk(m);
changed = true;
cv.notify_all();
cv.wait(lk, [this] { return !changed; });
}
void run() {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [this] { return changed; });
for (auto& observer : observers) {
observer();
}
changed = false;
cv.notify_all();
}
};
实现多阶段并行计算:
cpp复制class Barrier {
std::mutex m;
std::condition_variable cv;
int count;
int generation = 0;
const int threshold;
public:
explicit Barrier(int n) : threshold(n), count(n) {}
void wait() {
std::unique_lock<std::mutex> lk(m);
int gen = generation;
if (--count == 0) {
generation++;
count = threshold;
cv.notify_all();
} else {
cv.wait(lk, [this, gen] { return gen != generation; });
}
}
};
基于条件变量构建读写锁:
cpp复制class ReadWriteLock {
std::mutex m;
std::condition_variable cv;
int readers = 0;
bool writer = false;
public:
void read_lock() {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [this] { return !writer; });
readers++;
}
void read_unlock() {
std::lock_guard<std::mutex> lk(m);
if (--readers == 0) {
cv.notify_one();
}
}
void write_lock() {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [this] { return !writer && readers == 0; });
writer = true;
}
void write_unlock() {
std::lock_guard<std::mutex> lk(m);
writer = false;
cv.notify_all();
}
};
在实际项目中,我遇到过一个典型问题:消费者线程偶尔会错过生产者的通知。经过排查发现是因为在修改done标志和调用notify之间有一个极小的窗口期,导致消费者可能在这期间检查条件并进入等待。解决方案是确保修改共享状态和通知操作在同一个锁保护区域内:
cpp复制// 错误实现
{
std::lock_guard<std::mutex> lk(m);
done = true;
} // 解锁
cv.notify_all(); // 在这之间消费者可能检查条件并进入等待
// 正确实现
{
std::lock_guard<std::mutex> lk(m);
done = true;
cv.notify_all(); // 在持有锁的情况下通知
}
这个经验告诉我,在多线程编程中,原子性不仅限于单个变量的修改,还包括逻辑上相关的多个操作。