1. C++并发编程概述
现代C++(C++11及后续版本)引入了一套完整的并发编程工具库,彻底改变了以往需要依赖平台特定API(如POSIX线程或Windows线程)的局面。这套标准库提供了从基础线程管理到高级同步原语的全套工具,让开发者能够编写可移植、高效且安全的多线程程序。
在单核CPU时代,多线程主要解决的是I/O阻塞问题;而在多核处理器成为主流的今天,并发编程更重要的意义在于充分利用硬件并行计算能力。C++标准委员会显然意识到了这一点,从C++11开始系统地引入了现代并发编程支持,并在后续标准中不断完善。
提示:虽然C++11之前的版本也可以通过平台相关API实现多线程,但这类代码往往难以维护和移植。标准库的引入让C++真正拥有了跨平台的并发能力。
2. 线程管理基础
2.1 std::thread核心接口
std::thread是C++线程库的基础类,封装了操作系统原生线程的创建和管理功能。其构造函数支持多种形式:
cpp复制// 默认构造(不关联任何线程)
std::thread() noexcept;
// 初始化构造(立即启动新线程)
template <class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args);
// 移动构造(转移线程所有权)
thread(thread&& x) noexcept;
// 禁止拷贝构造
thread(const thread&) = delete;
实际创建线程时,最常用的是初始化构造函数。它接受一个可调用对象(函数、lambda表达式、函数对象等)和一系列参数:
cpp复制void print_message(const std::string& msg) {
std::cout << msg << std::endl;
}
int main() {
std::thread t(print_message, "Hello Concurrent World!");
t.join();
return 0;
}
2.2 线程生命周期管理
C++线程对象有明确的生命周期规则,违反这些规则会导致程序异常:
- join():阻塞当前线程直到目标线程执行完毕
- detach():分离线程,使其成为守护线程(后台运行)
- 析构行为:如果线程既未join也未detach,析构时将调用std::terminate()
cpp复制void worker() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Worker thread finished" << std::endl;
}
int main() {
std::thread t(worker);
// 必须选择以下一种处理方式
t.join(); // 等待线程结束
// t.detach(); // 或分离线程
return 0;
}
警告:忘记处理joinable的线程是常见错误。可以使用RAII包装器自动处理:
cpp复制class ThreadGuard {
std::thread& t;
public:
explicit ThreadGuard(std::thread& t_) : t(t_) {}
~ThreadGuard() {
if(t.joinable()) {
t.join();
}
}
ThreadGuard(const ThreadGuard&)=delete;
ThreadGuard& operator=(const ThreadGuard&)=delete;
};
2.3 线程标识与当前线程操作
std::this_thread命名空间提供对当前线程的操作:
cpp复制// 获取当前线程ID
std::thread::id this_id = std::this_thread::get_id();
// 主动让出CPU时间片
std::this_thread::yield();
// 线程睡眠
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_until(wakeup_time);
线程ID(std::thread::id)支持比较和哈希操作,可用于创建线程相关的数据结构:
cpp复制std::unordered_map<std::thread::id, std::string> thread_names;
void register_thread(const std::string& name) {
thread_names[std::this_thread::get_id()] = name;
}
3. 同步机制与互斥锁
3.1 互斥锁类型比较
C++提供了多种互斥锁类型以适应不同场景:
| 锁类型 | 特性 | 适用场景 |
|---|---|---|
| mutex | 基本互斥锁 | 一般同步需求 |
| recursive_mutex | 可重入锁 | 递归调用或可能重复加锁的场景 |
| timed_mutex | 带超时的互斥锁 | 需要避免长时间阻塞的场景 |
| recursive_timed_mutex | 可重入+超时 | 复杂递归场景下的超时控制 |
基本用法示例:
cpp复制std::mutex mtx;
int shared_data = 0;
void increment() {
mtx.lock();
++shared_data; // 临界区
mtx.unlock();
}
3.2 RAII锁管理
手动管理锁的获取和释放容易出错,C++提供了两种RAII风格的锁包装器:
- lock_guard:简单的作用域锁
- unique_lock:更灵活的锁管理(支持延迟锁定、条件变量等)
cpp复制// lock_guard简单示例
{
std::lock_guard<std::mutex> lock(mtx);
// 自动加锁,作用域结束自动解锁
shared_data += 42;
}
// unique_lock高级用法
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
// ...其他不需要锁的操作...
lock.lock(); // 显式加锁
// ...临界区操作...
lock.unlock(); // 可以手动提前解锁
3.3 死锁预防
多锁场景下容易发生死锁,C++提供了std::lock和std::try_lock来安全地获取多个锁:
cpp复制std::mutex mtx1, mtx2;
// 不安全的方式(可能死锁)
// thread1: lock mtx1 then mtx2
// thread2: lock mtx2 then mtx1
// 安全的方式
std::lock(mtx1, mtx2); // 同时锁定,避免死锁
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
经验法则:总是以固定顺序获取多个锁,或者使用std::lock一次性获取所有锁。
4. 条件变量与线程同步
4.1 条件变量基础
条件变量(condition_variable)允许线程等待特定条件成立,是线程间通信的重要机制:
cpp复制std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void worker_thread() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return ready; }); // 等待ready变为true
// 执行后续工作
}
void notify_thread() {
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_one(); // 唤醒等待线程
}
4.2 生产者-消费者模式
条件变量的经典应用场景:
cpp复制std::queue<int> data_queue;
std::mutex queue_mutex;
std::condition_variable data_cond;
void producer() {
for(int i=0; i<10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> lock(queue_mutex);
data_queue.push(i);
}
data_cond.notify_one();
}
}
void consumer() {
while(true) {
std::unique_lock<std::mutex> lock(queue_mutex);
data_cond.wait(lock, []{ return !data_queue.empty(); });
int val = data_queue.front();
data_queue.pop();
lock.unlock();
std::cout << "Consumed: " << val << std::endl;
}
}
4.3 虚假唤醒与超时等待
条件变量可能因系统原因出现虚假唤醒,因此总是使用谓词版本wait:
cpp复制// 不推荐(可能虚假唤醒)
cv.wait(lock);
// 推荐(安全)
cv.wait(lock, predicate);
带超时的等待接口:
cpp复制std::cv_status status = cv.wait_for(lock, std::chrono::seconds(1));
if(status == std::cv_status::timeout) {
// 处理超时
}
// 或者使用谓词版本
bool result = cv.wait_for(lock, std::chrono::seconds(1), predicate);
5. 原子操作与无锁编程
5.1 std::atomic基础
原子类型保证操作的不可分割性,无需显式锁:
cpp复制std::atomic<int> counter(0);
void increment() {
for(int i=0; i<1000; ++i) {
++counter; // 原子操作
}
}
5.2 内存顺序与同步
C++提供了多种内存顺序模型,控制原子操作的内存可见性:
cpp复制std::atomic<int> data(0);
std::atomic<bool> ready(false);
void producer() {
data.store(42, std::memory_order_relaxed);
ready.store(true, std::memory_order_release);
}
void consumer() {
while(!ready.load(std::memory_order_acquire));
std::cout << data.load(std::memory_order_relaxed) << std::endl;
}
5.3 CAS操作与无锁数据结构
比较交换(Compare-And-Swap)是构建无锁算法的基础:
cpp复制template<typename T>
class lock_free_stack {
struct node {
T data;
node* next;
};
std::atomic<node*> head;
public:
void push(const T& data) {
node* new_node = new node{data, head.load()};
while(!head.compare_exchange_weak(new_node->next, new_node));
}
// ...其他接口...
};
6. 异步任务与Future/Promise
6.1 std::async与std::future
std::async启动异步任务,返回std::future获取结果:
cpp复制int compute() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return 42;
}
int main() {
std::future<int> result = std::async(std::launch::async, compute);
std::cout << "Waiting for result..." << std::endl;
std::cout << "Result: " << result.get() << std::endl;
return 0;
}
6.2 std::promise与值传递
std::promise允许显式设置异步结果:
cpp复制void worker(std::promise<int> result_promise) {
std::this_thread::sleep_for(std::chrono::seconds(1));
result_promise.set_value(42);
}
int main() {
std::promise<int> promise;
std::future<int> result = promise.get_future();
std::thread t(worker, std::move(promise));
std::cout << "Result: " << result.get() << std::endl;
t.join();
return 0;
}
6.3 std::packaged_task任务包装
std::packaged_task将可调用对象与future绑定:
cpp复制std::packaged_task<int()> task([](){
std::this_thread::sleep_for(std::chrono::seconds(1));
return 42;
});
std::future<int> result = task.get_future();
std::thread t(std::move(task));
t.detach();
std::cout << "Result: " << result.get() << std::endl;
7. 线程池与高级并发模式
7.1 简单线程池实现
结合条件变量和任务队列实现线程池:
cpp复制class ThreadPool {
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
public:
explicit ThreadPool(size_t threads) {
for(size_t i=0; i<threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this]{ return stop || !tasks.empty(); });
if(stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(auto& worker : workers)
worker.join();
}
};
7.2 并行算法实现
利用线程池实现并行for_each:
cpp复制template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f, size_t num_threads) {
ThreadPool pool(num_threads);
for(; first != last; ++first) {
pool.enqueue([=]{ f(*first); });
}
}
8. 性能优化与调试技巧
8.1 锁粒度优化
减小锁的粒度可以提高并发性能:
cpp复制// 不好的做法:大粒度锁
std::mutex big_lock;
void process_data() {
std::lock_guard<std::mutex> lock(big_lock);
// 很多不相关的操作...
}
// 好的做法:细粒度锁
struct Data {
std::mutex mtx;
int value;
};
void process_data(Data& d1, Data& d2) {
{
std::lock_guard<std::mutex> lock1(d1.mtx);
// 操作d1...
}
{
std::lock_guard<std::mutex> lock2(d2.mtx);
// 操作d2...
}
}
8.2 避免锁争用
减少锁的持有时间和竞争:
- 使用读写锁(C++14的shared_mutex)
- 使用无锁数据结构
- 采用线程本地存储
- 使用原子操作替代锁
8.3 线程安全调试技巧
- TSAN(ThreadSanitizer):检测数据竞争
- 死锁检测工具:如gdb的deadlock命令
- 日志记录:关键操作的时序记录
- 断言检查:不变式验证
bash复制# 使用TSAN编译
g++ -fsanitize=thread -g -O1 program.cpp
9. C++20并发新特性
9.1 std::jthread
自动join的线程类:
cpp复制void worker() {
std::cout << "Working..." << std::endl;
}
int main() {
std::jthread t(worker); // 自动join
return 0;
}
9.2 std::latch与std::barrier
新的同步原语:
cpp复制std::latch completion_latch(3); // 需要3次count_down
void worker() {
// ...工作...
completion_latch.count_down();
}
int main() {
std::jthread t1(worker), t2(worker), t3(worker);
completion_latch.wait(); // 等待所有worker完成
return 0;
}
9.3 std::atomic_ref
对非原子变量的原子访问:
cpp复制int normal_var = 0;
std::atomic_ref<int> atomic_var(normal_var);
void increment() {
++atomic_var; // 原子操作
}
10. 实战案例分析
10.1 多线程日志系统
线程安全的日志系统实现:
cpp复制class Logger {
std::ofstream log_file;
std::mutex write_mutex;
public:
Logger(const std::string& filename) : log_file(filename) {}
void log(const std::string& message) {
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
std::lock_guard<std::mutex> lock(write_mutex);
log_file << std::put_time(std::localtime(&time), "%F %T")
<< " - " << message << std::endl;
}
};
10.2 并行快速排序
利用多线程加速排序:
cpp复制template<typename RandomIt>
void parallel_quick_sort(RandomIt first, RandomIt last, ThreadPool& pool) {
if(first == last) return;
auto const pivot = *std::prev(last);
auto const middle1 = std::partition(first, last, [=](auto const& x){ return x < pivot; });
auto const middle2 = std::partition(middle1, last, [=](auto const& x){ return !(pivot < x); });
auto left_part = [=, &pool] {
parallel_quick_sort(first, middle1, pool);
};
auto right_part = [=, &pool] {
parallel_quick_sort(middle2, last, pool);
};
if(std::distance(first, middle1) > 1000) {
pool.enqueue(left_part);
} else {
left_part();
}
right_part();
}
10.3 线程安全缓存
带过期机制的线程安全缓存:
cpp复制template<typename Key, typename Value>
class ThreadSafeCache {
struct Entry {
Value value;
std::chrono::system_clock::time_point expiry;
};
std::unordered_map<Key, Entry> cache;
mutable std::shared_mutex mutex;
std::chrono::seconds default_ttl;
void cleanup() {
auto now = std::chrono::system_clock::now();
for(auto it = cache.begin(); it != cache.end(); ) {
if(it->second.expiry <= now) {
it = cache.erase(it);
} else {
++it;
}
}
}
public:
ThreadSafeCache(std::chrono::seconds ttl) : default_ttl(ttl) {}
void set(const Key& key, const Value& value,
std::chrono::seconds ttl = std::chrono::seconds(0)) {
if(ttl.count() == 0) ttl = default_ttl;
auto expiry = std::chrono::system_clock::now() + ttl;
std::unique_lock lock(mutex);
cache[key] = {value, expiry};
if(cache.size() % 10 == 0) cleanup();
}
bool get(const Key& key, Value& value) {
std::shared_lock lock(mutex);
auto it = cache.find(key);
if(it == cache.end()) return false;
if(it->second.expiry <= std::chrono::system_clock::now()) {
lock.unlock();
std::unique_lock unique_lock(mutex);
cache.erase(it);
return false;
}
value = it->second.value;
return true;
}
};
11. 常见问题与解决方案
11.1 死锁问题排查
常见死锁场景及解决方案:
- 锁顺序不一致:统一获取锁的顺序
- 递归锁误用:避免不必要的递归锁
- 未释放锁:使用RAII管理锁
- 条件变量误用:总是使用谓词版本wait
11.2 性能瓶颈分析
多线程程序性能问题诊断:
- 锁争用:使用profiler检测热点锁
- 缓存失效:优化数据局部性
- 线程过多:合理设置线程池大小
- 虚假共享:使用缓存行对齐
11.3 内存模型问题
常见内存可见性问题:
- 原子操作误用:正确选择内存顺序
- 数据竞争:确保共享数据有适当同步
- 指令重排序:使用内存屏障控制
12. 最佳实践总结
经过多年C++并发编程实践,我总结了以下经验法则:
- 优先使用高级抽象:如std::async、并行算法等
- 最小化共享数据:减少同步需求
- 使用RAII管理资源:避免资源泄漏
- 测试多线程场景:特别关注边界条件
- 渐进式优化:先保证正确性,再考虑性能
- 利用现代工具:TSAN、死锁检测器等
- 保持代码简单:复杂并发逻辑难以维护
并发编程是C++中最具挑战性的领域之一,但也是最能体现程序员功力的地方。掌握这些工具和技术,你将能够构建高效、可靠的多线程应用,充分利用现代硬件的计算能力。