在当今多核处理器普及的时代,掌握并发编程已经成为C++开发者的必备技能。我清晰地记得第一次尝试用std::thread创建多个线程时的场景——程序运行结果完全不符合预期,数据竞争导致的结果每次运行都不一样。正是这些痛苦的调试经历让我意识到,系统性地学习C++并发知识有多么重要。
《C++并发实践》第二版这本书之所以值得反复研读,是因为它不仅覆盖了C++11到C++20的现代并发特性,更重要的是提供了大量工程实践中的解决方案。与第一版相比,第二版新增了关于内存模型、原子操作和并行算法的深入讨论,这些都是构建高性能并发系统的关键要素。
现代CPU的乱序执行和缓存体系使得内存访问顺序变得复杂。我曾在一个项目中遇到这样的问题:两个线程通过原子bool变量进行同步,但偶尔还是会出现数据不一致的情况。后来发现是因为没有正确理解内存顺序(memory_order)导致的。
C++标准定义了6种内存顺序:
cpp复制std::atomic<bool> ready{false};
int data = 0;
// 线程1
data = 42;
ready.store(true, std::memory_order_release);
// 线程2
while(!ready.load(std::memory_order_acquire));
assert(data == 42); // 这个断言永远不会失败
原子操作不仅仅是简单的计数器递增。在实际项目中,我经常用它们来实现无锁数据结构。比如一个多生产者单消费者的队列:
cpp复制template<typename T>
class MPSCQueue {
struct Node {
T data;
std::atomic<Node*> next;
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
void push(T value) {
Node* new_node = new Node{std::move(value), nullptr};
Node* old_tail = tail.exchange(new_node, std::memory_order_acq_rel);
old_tail->next.store(new_node, std::memory_order_release);
}
bool pop(T& value) {
Node* old_head = head.load(std::memory_order_relaxed);
if(old_head == nullptr) return false;
head.store(old_head->next.load(std::memory_order_acquire),
std::memory_order_relaxed);
value = std::move(old_head->data);
delete old_head;
return true;
}
};
重要提示:无锁编程虽然性能高,但调试难度极大。建议先用标准库提供的并发容器,确实有性能瓶颈再考虑自实现。
创建线程看起来简单,但有很多细节需要注意。我曾经在一个服务中直接创建了大量线程,结果导致系统资源耗尽。现在我会遵循这些原则:
cpp复制class ThreadGuard {
std::thread t;
public:
explicit ThreadGuard(std::thread t_) : t(std::move(t_)) {
if(!t.joinable()) throw std::logic_error("No thread");
}
~ThreadGuard() {
if(t.joinable()) t.join();
}
ThreadGuard(const ThreadGuard&)=delete;
ThreadGuard& operator=(const ThreadGuard&)=delete;
};
void worker() { /*...*/ }
int main() {
ThreadGuard t(std::thread(worker));
// 线程会在作用域结束时自动join
}
条件变量(condition_variable)是线程同步的重要工具,但也是最容易用错的原语之一。常见的错误包括:
正确的使用模式:
cpp复制std::mutex mtx;
std::condition_variable cv;
bool ready = false;
// 等待线程
{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, []{ return ready; });
// 处理事件
}
// 通知线程
{
std::lock_guard<std::mutex> lk(mtx);
ready = true;
}
cv.notify_one();
C++17引入了并行算法,可以显著提升数据处理性能。在我的一个图像处理项目中,使用并行排序将性能提升了3倍:
cpp复制#include <execution>
#include <algorithm>
std::vector<Image> process_images(std::vector<Image>& images) {
// 并行排序
std::sort(std::execution::par, images.begin(), images.end());
// 并行变换
std::for_each(std::execution::par, images.begin(), images.end(),
[](Image& img) {
apply_filter(img);
});
return images;
}
需要注意的几点:
在多核编程中,一个常见的性能杀手是伪共享。我曾在优化一个高频交易系统时发现,看似并行的计数器更新实际上比单线程还慢,原因就是不同核心的缓存行冲突。
解决方案:
cpp复制struct alignas(64) Counter {
std::atomic<int> value;
};
Counter counters[4]; // 每个计数器位于不同的缓存行
ThreadSanitizer(TSan)是检测并发问题的利器。在编译时添加-fsanitize=thread选项,运行时就能发现潜在的数据竞争。
我曾经用TSan发现了一个隐藏很深的问题:一个看似线程安全的单例实现,实际上在构造函数中访问了非线程安全的全局变量。
bash复制g++ -fsanitize=thread -g -O1 test.cpp -o test
./test
并发代码的测试需要特殊方法:
一个简单的压力测试框架:
cpp复制void concurrent_test() {
std::vector<std::thread> threads;
SharedResource resource;
for(int i=0; i<10; ++i) {
threads.emplace_back([&] {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1,100);
for(int j=0; j<1000; ++j) {
std::this_thread::sleep_for(
std::chrono::microseconds(dis(gen)));
resource.access();
}
});
}
for(auto& t : threads) t.join();
}
std::jthread是对std::thread的改进,主要特点:
cpp复制void worker(std::stop_token st) {
while(!st.stop_requested()) {
// 执行工作
std::this_thread::sleep_for(100ms);
}
}
int main() {
std::jthread t(worker);
// ...
t.request_stop(); // 优雅停止线程
// 不需要显式join
}
C++20引入了协程支持,为异步编程提供了新范式。虽然标准库的协程工具还很基础,但第三方库如cppcoro已经提供了丰富功能。
cpp复制#include <cppcoro/task.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/static_thread_pool.hpp>
cppcoro::task<int> compute_value(StaticThreadPool& pool) {
co_await pool.schedule();
auto result = some_expensive_computation();
co_return result;
}
int main() {
StaticThreadPool pool;
int result = sync_wait(compute_value(pool));
}
在实际项目中,我发现协程特别适合IO密集型任务,可以避免回调地狱,同时保持高性能。
设计线程安全数据结构时,锁粒度是关键考量。我通常遵循这些原则:
一个线程安全的队列实现:
cpp复制template<typename T>
class ThreadSafeQueue {
mutable std::mutex mtx;
std::queue<T> data;
std::condition_variable cv;
public:
void push(T value) {
std::lock_guard<std::mutex> lk(mtx);
data.push(std::move(value));
cv.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mtx);
if(data.empty()) return false;
value = std::move(data.front());
data.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [this]{ return !data.empty(); });
value = std::move(data.front());
data.pop();
}
};
无锁数据结构可以提供更好的伸缩性,但设计复杂度大大增加。最常见的无锁模式:
一个无锁栈的简单实现:
cpp复制template<typename T>
class LockFreeStack {
struct Node {
T data;
Node* next;
};
std::atomic<Node*> head;
public:
void push(const T& value) {
Node* new_node = new Node{value, nullptr};
new_node->next = head.load(std::memory_order_relaxed);
while(!head.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release, std::memory_order_relaxed));
}
bool pop(T& value) {
Node* old_head = head.load(std::memory_order_relaxed);
while(old_head &&
!head.compare_exchange_weak(old_head, old_head->next,
std::memory_order_acquire, std::memory_order_relaxed));
if(!old_head) return false;
value = std::move(old_head->data);
delete old_head;
return true;
}
};
实际项目中,建议直接使用boost.lockfree或TBB中的无锁容器,它们经过了充分测试。
在日志系统中,我使用多生产者单消费者模式取得了很好的效果。关键优化点:
cpp复制class LogSystem {
MPSCQueue<std::string> queue;
std::atomic<bool> running{true};
std::thread consumer;
void consume() {
std::vector<std::string> batch;
while(running || !queue.empty()) {
std::string msg;
if(queue.pop(msg)) {
batch.push_back(std::move(msg));
if(batch.size() >= 100) {
write_to_disk(batch);
batch.clear();
}
} else {
std::this_thread::yield();
}
}
if(!batch.empty()) write_to_disk(batch);
}
public:
LogSystem() : consumer(&LogSystem::consume, this) {}
~LogSystem() {
running = false;
consumer.join();
}
void log(const std::string& msg) {
queue.push(msg);
}
};
对于计算密集型任务,我通常采用工作窃取(work stealing)策略。C++17的并行算法内部就使用了这种模式。
一个简单的任务池实现:
cpp复制class ThreadPool {
std::vector<std::thread> workers;
std::deque<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable cv;
bool stop = false;
public:
ThreadPool(size_t threads) {
for(size_t i=0; i<threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
cv.wait(lock, [this]{ return stop || !tasks.empty(); });
if(stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop_front();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
std::lock_guard<std::mutex> lock(queue_mutex);
tasks.emplace_back(std::forward<F>(f));
}
cv.notify_one();
}
~ThreadPool() {
{
std::lock_guard<std::mutex> lock(queue_mutex);
stop = true;
}
cv.notify_all();
for(auto& worker : workers)
worker.join();
}
};
Linux下的perf工具是分析并发程序性能的利器。我常用这些命令:
bash复制# 统计缓存命中率
perf stat -e cache-references,cache-misses ./my_program
# 生成火焰图
perf record -g ./my_program
perf script | stackcollapse-perf.pl | flamegraph.pl > flame.svg
通过分析发现,一个常见的性能问题是过度同步。比如在一个高频交易系统中,将互斥锁改为读写锁后,吞吐量提升了40%。
不同内存顺序的原子操作性能差异很大。在我的基准测试中(x86架构):
| 操作类型 | 时钟周期(近似) |
|---|---|
| relaxed原子操作 | 10-20 |
| acquire/release原子操作 | 20-30 |
| seq_cst原子操作 | 50-100 |
| 互斥锁操作 | 100-200 |
因此,在性能关键路径上,应该选择合适的内存顺序,而不是总是使用默认的memory_order_seq_cst。
Windows和Linux的线程优先级模型不同,需要封装抽象层:
cpp复制void set_thread_priority(std::thread& t, Priority p) {
auto native = t.native_handle();
#ifdef _WIN32
int win_prio;
switch(p) {
case Priority::Low: win_prio = THREAD_PRIORITY_BELOW_NORMAL; break;
case Priority::Normal: win_prio = THREAD_PRIORITY_NORMAL; break;
case Priority::High: win_prio = THREAD_PRIORITY_ABOVE_NORMAL; break;
}
SetThreadPriority(native, win_prio);
#else
int policy;
sched_param param;
pthread_getschedparam(native, &policy, ¶m);
switch(p) {
case Priority::Low: param.sched_priority = sched_get_priority_min(policy); break;
case Priority::Normal: param.sched_priority = (sched_get_priority_min(policy) +
sched_get_priority_max(policy))/2; break;
case Priority::High: param.sched_priority = sched_get_priority_max(policy); break;
}
pthread_setschedparam(native, policy, ¶m);
#endif
}
在多线程程序中处理信号需要特别注意:
cpp复制void signal_handler_thread() {
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
int sig;
while(true) {
if(sigwait(&set, &sig) == 0) {
if(sig == SIGINT || sig == SIGTERM) {
// 清理资源
exit(0);
}
}
}
}
int main() {
// 主线程阻塞所有信号
sigset_t set;
sigfillset(&set);
pthread_sigmask(SIG_BLOCK, &set, nullptr);
// 创建信号处理线程
std::thread sig_thread(signal_handler_thread);
// 其他线程...
}
C++23和未来的标准可能会引入更多并发特性,比如:
在我最近参与的一个项目中,我们实验性地使用了C++20的协程与并行算法结合,实现了非常优雅的异步数据处理流水线。虽然目前还需要一些样板代码,但未来的C++版本很可能会让并发编程变得更加简单和安全。
根据我的学习经验,建议按照这个路径深入:
除了《C++并发实践》第二版外,这些资源也很有价值:
实践建议:
最后分享一个我在调试并发问题时的小技巧:给每个线程分配独特的ID,并在日志中输出,这样可以更清晰地跟踪线程间的交互。