十年前我刚接触C++并发编程时,面对POSIX线程接口手足无措的日子还历历在目。如今在双核处理器已成历史、16核服务器遍地开花的时代,不会并发编程的C++开发者就像拿着狙击枪却只会当棍棒使用。现代应用对性能的压榨已经到了毫秒必争的地步——游戏引擎需要同时处理物理模拟和AI决策,金融系统要并行处理数百万笔交易,Web服务器更要应对海量并发请求。
C++11标准引入的线程库彻底改变了我们编写并发代码的方式。不再需要面对晦涩的pthread_create参数,不再需要手动管理线程生命周期。但这也带来了新的挑战:如何避免数据竞争?怎样设计无锁数据结构?什么时候该用原子操作?这些问题的答案,都将在本文通过实际案例一一揭晓。
创建线程看似简单,但魔鬼藏在细节里。以下是一个典型的生产者-消费者模型实现:
cpp复制std::vector<std::thread> workers;
std::queue<Data> data_queue;
std::mutex queue_mutex;
std::condition_variable cond_var;
// 生产者线程
void producer() {
while (has_data()) {
Data data = prepare_data();
{
std::lock_guard<std::mutex> lock(queue_mutex);
data_queue.push(std::move(data));
}
cond_var.notify_one();
}
}
// 消费者线程
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(queue_mutex);
cond_var.wait(lock, []{ return !data_queue.empty(); });
Data data = std::move(data_queue.front());
data_queue.pop();
lock.unlock();
process(data);
}
}
关键经验:永远在锁的保护下操作共享数据,但锁的持有时间要尽可能短。我曾在项目中见过一个持有锁进行文件I/O的操作,直接导致系统吞吐量下降90%。
原子类型(std::atomic)是避免数据竞争的利器,但理解其内存模型至关重要。考虑这个常见的双重检查锁定模式:
cpp复制class Singleton {
public:
static Singleton& instance() {
Singleton* tmp = instance_.load(std::memory_order_acquire);
if (!tmp) {
std::lock_guard<std::mutex> lock(mutex_);
tmp = instance_.load(std::memory_order_relaxed);
if (!tmp) {
tmp = new Singleton;
instance_.store(tmp, std::memory_order_release);
}
}
return *tmp;
}
private:
static std::atomic<Singleton*> instance_;
static std::mutex mutex_;
};
这里使用了memory_order_acquire和memory_order_release来建立正确的happens-before关系。在实际性能测试中,这种实现比纯互斥锁版本快3-5倍。
无锁数据结构能极大提升并发性能,但编写正确的无锁代码如同走钢丝。下面是一个简单的无锁栈实现:
cpp复制template<typename T>
class lock_free_stack {
private:
struct node {
T data;
node* next;
node(T const& data_): data(data_) {}
};
std::atomic<node*> head;
public:
void push(T const& data) {
node* const new_node = new node(data);
new_node->next = head.load();
while(!head.compare_exchange_weak(new_node->next, new_node));
}
bool pop(T& result) {
node* old_head = head.load();
while(old_head &&
!head.compare_exchange_weak(old_head, old_head->next));
if(!old_head) return false;
result = old_head->data;
delete old_head;
return true;
}
};
血泪教训:无锁编程中ABA问题是最隐蔽的坑。我曾花费两周时间追踪一个只在百万次操作后才会出现的bug,最终发现是经典的ABA问题。解决方案是使用带标签指针或风险指针。
一个工业级线程池需要考虑任务窃取、优雅关闭等复杂场景。以下是核心架构:
cpp复制class ThreadPool {
public:
explicit ThreadPool(size_t thread_count = std::thread::hardware_concurrency()) {
for(size_t i = 0; i < thread_count; ++i) {
workers_.emplace_back([this] {
for(;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock,
[this]{ return stop_ || !tasks_.empty(); });
if(stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if(stop_) throw std::runtime_error("enqueue on stopped ThreadPool");
tasks_.emplace([task](){ (*task)(); });
}
condition_.notify_one();
return res;
}
~ThreadPool() {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for(std::thread &worker: workers_)
worker.join();
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_ = false;
};
实测表明,这种实现比简单创建线程快20倍以上,特别是在短任务密集的场景。
使用perf工具分析锁竞争:
bash复制perf record -g -p <pid> -- sleep 30
perf report -n --stdio
常见优化策略:
x86和ARM的内存模型差异会导致微妙的问题。考虑这个例子:
cpp复制// 线程1
x = 1;
ready = true;
// 线程2
while(!ready);
assert(x == 1); // 在x86上总是成立,但在ARM上可能失败!
解决方案是始终使用正确的内存序:
cpp复制// 线程1
x.store(1, std::memory_order_relaxed);
ready.store(true, std::memory_order_release);
// 线程2
while(!ready.load(std::memory_order_acquire));
assert(x.load(std::memory_order_relaxed) == 1);
C++20引入的协程彻底改变了异步编程范式:
cpp复制task<int> compute_value() {
int result = co_await async_operation();
co_return result * 2;
}
task<void> test() {
int val = co_await compute_value();
std::cout << "Value: " << val << '\n';
}
C++17的并行算法可以轻松利用多核:
cpp复制std::vector<int> data(1000000);
std::sort(std::execution::par, data.begin(), data.end());
实测在16核机器上,并行排序比单线程快12倍。
cpp复制struct alignas(64) CacheLineAligned {
int data;
};
优先级反转:高优先级线程等待低优先级线程持有的锁。解决方案是使用优先级继承互斥锁。
死锁:严格按照固定顺序获取多个锁,或使用std::scoped_lock。
线程泄漏:确保所有线程在析构函数中join或detach。
异常安全:任务抛异常导致线程池崩溃。解决方案是捕获所有异常:
cpp复制task = [task]() {
try { (*task)(); }
catch(...) { /* 记录日志 */ }
};
在金融交易系统开发中,我曾遇到一个因错误使用memory_order_relaxed导致的bug,导致每秒损失数万元。最终通过严格的代码审查和TSAN工具发现了问题。这让我明白:并发编程中,正确性永远比性能更重要。