生产者消费者问题(Producer-Consumer Problem)是多线程编程中的经典案例,它描述了两个角色——生产者和消费者共享固定大小缓冲区时的协作关系。生产者负责生成数据放入缓冲区,消费者则从缓冲区取出数据进行处理。这个模型在现实中有广泛的应用场景,比如消息队列系统、任务调度框架、数据流水线处理等。
在这个模型中,核心挑战在于如何确保:
互斥锁(Mutex)是最基本的线程同步原语,它提供了对共享资源的独占访问机制。从底层实现来看,现代操作系统的互斥锁通常结合了用户空间的原子操作和内核空间的等待队列:
在Linux中,pthread_mutex_t的典型实现经历了从futex到更高效机制的演进。Windows平台的CRITICAL_SECTION也有类似的优化路径。
在生产者消费者模型中,互斥锁主要保护两类操作:
cpp复制pthread_mutex_t buffer_mutex;
// 生产者线程
void producer() {
pthread_mutex_lock(&buffer_mutex);
// 检查缓冲区是否满
// 向缓冲区添加数据
pthread_mutex_unlock(&buffer_mutex);
}
// 消费者线程
void consumer() {
pthread_mutex_lock(&buffer_mutex);
// 检查缓冲区是否空
// 从缓冲区取出数据
pthread_mutex_unlock(&buffer_mutex);
}
这种基础实现存在明显问题:当缓冲区满时,生产者会不断循环检查(忙等待),浪费CPU资源;同样,消费者在空缓冲区时也会出现类似情况。
条件变量(Condition Variable)解决的核心问题是"有条件的等待"。与互斥锁配合使用时,它提供了三个关键操作:
在Linux的pthread实现中,条件变量内部维护了一个等待队列。当调用pthread_cond_wait时,线程会被放入这个队列并释放关联的互斥锁。
正确的条件变量使用需要遵循"三重检查"模式:
cpp复制pthread_mutex_t mutex;
pthread_cond_t cond;
bool condition = false;
// 等待方
pthread_mutex_lock(&mutex);
while (!condition) { // 必须用while而不是if
pthread_cond_wait(&cond, &mutex);
}
// 处理条件满足的情况
pthread_mutex_unlock(&mutex);
// 通知方
pthread_mutex_lock(&mutex);
condition = true;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
条件变量的核心在于它总是与某个共享状态(如"缓冲区非空")相关联。互斥锁确保了对这个共享状态的检查与修改是原子的。没有互斥锁的保护,可能会出现以下竞态条件:
pthread_cond_wait的原子性体现在它同时完成两个操作:
这两个操作是不可分割的,否则会出现信号丢失或死锁的情况。现代操作系统通过内核级的同步原语保证这一原子性。
条件变量可能因为系统信号或其他原因出现虚假唤醒(spurious wakeup)。互斥锁与while循环的组合提供了防御这种异常情况的机制:
cpp复制while (!condition) {
pthread_cond_wait(&cond, &mutex);
}
这种模式确保即使发生虚假唤醒,线程也会重新检查条件是否真正满足。
cpp复制#define BUFFER_SIZE 10
int buffer[BUFFER_SIZE];
int count = 0, in = 0, out = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
void *producer(void *arg) {
for (int i = 0; i < 100; ++i) {
pthread_mutex_lock(&mutex);
while (count == BUFFER_SIZE) {
pthread_cond_wait(¬_full, &mutex);
}
buffer[in] = i;
in = (in + 1) % BUFFER_SIZE;
count++;
pthread_cond_signal(¬_empty);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void *consumer(void *arg) {
for (int i = 0; i < 100; ++i) {
pthread_mutex_lock(&mutex);
while (count == 0) {
pthread_cond_wait(¬_empty, &mutex);
}
int item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count--;
pthread_cond_signal(¬_full);
pthread_mutex_unlock(&mutex);
printf("Consumed: %d\n", item);
}
return NULL;
}
cpp复制// 错误示例
pthread_mutex_lock(&mutex);
condition = true;
pthread_mutex_unlock(&mutex);
// 忘记调用 pthread_cond_signal
cpp复制// 线程1
pthread_mutex_lock(&mutexA);
pthread_mutex_lock(&mutexB);
// 线程2
pthread_mutex_lock(&mutexB);
pthread_mutex_lock(&mutexA);
info threads和thread apply all bt命令perf工具分析锁等待时间__attribute__((aligned(64)))对齐关键变量C++11引入了标准线程库,提供了更类型安全的接口:
cpp复制std::mutex mtx;
std::condition_variable cv;
bool ready = false;
// 等待方
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, []{ return ready; });
// 通知方
{
std::lock_guard<std::mutex> lck(mtx);
ready = true;
}
cv.notify_one();
在实际工程中,选择同步机制时需要权衡开发效率、性能需求和可维护性。对于大多数应用场景,条件变量+互斥锁的组合仍然是平衡各方面需求的可靠选择。