多线程编程是现代软件开发中不可或缺的核心技能,特别是在高性能计算、服务器开发和系统编程领域。作为一名长期奋战在C++开发一线的工程师,我深知多线程编程既是提升程序性能的利器,也是容易引入复杂问题的双刃剑。本文将基于四个典型的生产者-消费者模型案例,由浅入深地剖析C++多线程编程的核心技术与最佳实践。
生产者-消费者模型是多线程编程中最经典的同步问题之一。在这个模型中,生产者线程负责生成数据并放入共享缓冲区,而消费者线程则从缓冲区取出数据进行处理。两者通过同步机制协调工作,避免数据竞争和资源冲突。
在POSIX线程库中,我们主要使用以下同步原语:
让我们先看一个完整的基础实现,随后逐步解析关键部分:
cpp复制#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#define BUF_SIZE 3
int g_buf[BUF_SIZE];
int g_idx = 0;
pthread_mutex_t g_mutex;
pthread_cond_t g_cond_prod;
pthread_cond_t g_cond_cons;
void* producer(void* arg) {
for (int i = 0; i < 5; i++) {
pthread_mutex_lock(&g_mutex);
while (g_idx == BUF_SIZE) {
pthread_cond_wait(&g_cond_prod, &g_mutex);
}
g_buf[g_idx++] = i;
printf("生产者[%ld]:放入%d,缓冲区数量=%d\n", pthread_self(), i, g_idx);
pthread_cond_signal(&g_cond_cons);
pthread_mutex_unlock(&g_mutex);
sleep(1);
}
return NULL;
}
void* consumer(void* arg) {
for (int i = 0; i < 5; i++) {
pthread_mutex_lock(&g_mutex);
while (g_idx == 0) {
pthread_cond_wait(&g_cond_cons, &g_mutex);
}
int val = g_buf[--g_idx];
printf("消费者[%ld]:取出%d,缓冲区数量=%d\n", pthread_self(), val, g_idx);
pthread_cond_signal(&g_cond_prod);
pthread_mutex_unlock(&g_mutex);
sleep(1);
}
return NULL;
}
int main() {
pthread_t tid_prod, tid_cons;
pthread_mutex_init(&g_mutex, NULL);
pthread_cond_init(&g_cond_prod, NULL);
pthread_cond_init(&g_cond_cons, NULL);
pthread_create(&tid_prod, NULL, producer, NULL);
pthread_create(&tid_cons, NULL, consumer, NULL);
pthread_join(tid_prod, NULL);
pthread_join(tid_cons, NULL);
pthread_mutex_destroy(&g_mutex);
pthread_cond_destroy(&g_cond_prod);
pthread_cond_destroy(&g_cond_cons);
return 0;
}
条件变量使用时必须注意以下几点:
cpp复制while (g_idx == BUF_SIZE) {
pthread_cond_wait(&g_cond_prod, &g_mutex);
}
使用while循环是为了防止虚假唤醒(spurious wakeup),即线程可能在没有收到明确信号的情况下被唤醒。通过循环重新检查条件,可以确保条件真正满足后再继续执行。
所有同步原语都必须正确初始化和销毁,否则可能导致资源泄漏或未定义行为:
cpp复制// 初始化
pthread_mutex_init(&g_mutex, NULL);
pthread_cond_init(&g_cond_prod, NULL);
pthread_cond_init(&g_cond_cons, NULL);
// 销毁
pthread_mutex_destroy(&g_mutex);
pthread_cond_destroy(&g_cond_prod);
pthread_cond_destroy(&g_cond_cons);
通过sleep函数模拟生产和消费的耗时操作,可以更直观地观察线程间的交互:
cpp复制// 生产者
sleep(1); // 模拟生产耗时
// 消费者
sleep(1); // 模拟消费耗时
在实际应用中,这些sleep应该被实际的生产和消费操作所替代。
提示:在多生产者或多消费者场景下,应该使用pthread_cond_broadcast而非pthread_cond_signal,以避免某些线程被永久阻塞。
信号量(Semaphore)是一种更为通用的同步机制,它可以用来控制对共享资源的访问数量。与条件变量不同,信号量本身维护了一个计数器,不需要额外的条件判断。
POSIX信号量主要操作:
考虑一个银行有3个服务窗口,同时来了10个客户办理业务的场景。我们可以使用信号量来实现窗口资源的分配:
cpp复制#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <semaphore.h>
#define NUM_WINDOWS 3
#define NUM_CUSTOMERS 10
sem_t sem_windows;
void* customer(void* arg) {
int id = *(int*)arg;
free(arg);
printf("--- 客户 %d 到达银行,正在排队...\n", id);
sem_wait(&sem_windows);
printf("+++ 客户 %d 抢到了窗口,正在办理业务...\n", id);
sleep(2);
printf("<<< 客户 %d 办理完毕,离开窗口\n", id);
sem_post(&sem_windows);
return NULL;
}
int main() {
pthread_t tids[NUM_CUSTOMERS];
sem_init(&sem_windows, 0, NUM_WINDOWS);
for (int i = 0; i < NUM_CUSTOMERS; i++) {
int* p_id = malloc(sizeof(int));
*p_id = i;
pthread_create(&tids[i], NULL, customer, p_id);
usleep(100000);
}
for (int i = 0; i < NUM_CUSTOMERS; i++) {
pthread_join(tids[i], NULL);
}
sem_destroy(&sem_windows);
return 0;
}
注意:信号量虽然功能强大,但在简单场景下使用互斥锁和条件变量组合可能更直观,代码也更易维护。
C++提供了更高级的抽象机制,我们可以将同步原语封装成类,利用RAII(Resource Acquisition Is Initialization)技术自动管理资源生命周期。同时,使用模板可以实现通用的数据结构。
cpp复制#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <semaphore.h>
#include <unistd.h>
class Semaphore {
private:
sem_t _sem;
public:
Semaphore(int value) { sem_init(&_sem, 0, value); }
~Semaphore() { sem_destroy(&_sem); }
void wait() { sem_wait(&_sem); }
void signal() { sem_post(&_sem); }
};
template <typename T>
class RingQueue {
private:
std::vector<T> _buffer;
int _capacity;
int _head;
int _tail;
std::mutex _mtx;
Semaphore _sem_empty;
Semaphore _sem_data;
public:
RingQueue(int cap) : _capacity(cap), _buffer(cap), _head(0), _tail(0),
_sem_empty(cap), _sem_data(0) {}
void push(const T& data) {
_sem_empty.wait();
{
std::lock_guard<std::mutex> lock(_mtx);
_buffer[_tail] = data;
_tail = (_tail + 1) % _capacity;
}
_sem_data.signal();
}
void pop(T* out_data) {
_sem_data.wait();
{
std::lock_guard<std::mutex> lock(_mtx);
*out_data = _buffer[_head];
_head = (_head + 1) % _capacity;
}
_sem_empty.signal();
}
};
RingQueue<int> g_queue(5);
void producer() {
int i = 0;
while (true) {
std::cout << "生产者 [" << std::this_thread::get_id() << "] 生产: " << i << std::endl;
g_queue.push(i++);
sleep(1);
}
}
void consumer() {
while (true) {
int data;
g_queue.pop(&data);
std::cout << " >>> 消费者 [" << std::this_thread::get_id() << "] 消费: " << data << std::endl;
sleep(2);
}
}
int main() {
std::thread t_prod(producer);
std::thread t_cons(consumer);
t_prod.join();
t_cons.join();
return 0;
}
提示:在单生产者单消费者(SPSC)场景下,可以移除互斥锁,仅使用信号量即可保证线程安全,因为生产者和消费者不会同时修改同一变量。
无锁(lock-free)编程是一种高性能并发编程技术,它通过原子操作和内存顺序控制来实现线程安全,避免了传统锁带来的性能开销和潜在死锁问题。
C++11引入了
cpp复制#include <vector>
#include <atomic>
#include <thread>
#include <iostream>
template <typename T>
class SPSCLockFreeQueue {
private:
std::vector<T> _buffer;
int _capacity;
std::atomic<int> _head;
std::atomic<int> _tail;
public:
SPSCLockFreeQueue(int cap) : _capacity(cap + 1), _buffer(cap + 1),
_head(0), _tail(0) {}
bool push(const T& data) {
int current_tail = _tail.load(std::memory_order_relaxed);
int next_tail = (current_tail + 1) % _capacity;
if (next_tail == _head.load(std::memory_order_acquire)) {
return false;
}
_buffer[current_tail] = data;
_tail.store(next_tail, std::memory_order_release);
return true;
}
bool pop(T* out_data) {
int current_head = _head.load(std::memory_order_relaxed);
if (current_head == _tail.load(std::memory_order_acquire)) {
return false;
}
*out_data = _buffer[current_head];
_head.store((current_head + 1) % _capacity, std::memory_order_release);
return true;
}
};
在SPSC队列中:
选择建议:
死锁预防:
性能调优:
调试工具:
C++11/14/17/20引入了一系列现代并发编程特性:
在实际项目中,应该根据需求选择合适的抽象层次,平衡性能、安全性和开发效率。