1. C++多线程开发基础入门
多线程编程是现代软件开发中不可或缺的技能,特别是在高性能计算、游戏开发和服务器编程等领域。C++11标准引入了原生的线程支持,使得多线程开发变得更加便捷和安全。
1.1 第一个多线程程序
让我们从一个简单的多线程示例开始:
cpp复制#include <iostream>
#include <thread>
#include <chrono>
// 线程函数
void threadFunction(int id) {
std::cout << "线程 " << id << " 开始执行" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "线程 " << id << " 结束执行" << std::endl;
}
int main() {
std::cout << "主线程开始,创建3个子线程" << std::endl;
// 创建并启动线程
std::thread t1(threadFunction, 1);
std::thread t2(threadFunction, 2);
std::thread t3(threadFunction, 3);
// 等待线程完成
t1.join();
t2.join();
t3.join();
std::cout << "所有线程执行完毕" << std::endl;
return 0;
}
这个程序展示了多线程编程的几个关键点:
- 使用
std::thread创建线程 - 线程函数可以接受参数
- 使用
join()等待线程结束 - 线程间的执行顺序是不确定的
注意:在实际开发中,线程函数的异常处理非常重要。如果线程函数抛出异常且未被捕获,程序将调用
std::terminate终止。
1.2 线程管理的基本操作
cpp复制#include <thread>
#include <iostream>
void worker() {
std::cout << "工作线程ID: " << std::this_thread::get_id() << std::endl;
}
int main() {
// 获取硬件支持的并发线程数
unsigned int n = std::thread::hardware_concurrency();
std::cout << "硬件支持的最大并发线程数: " << n << std::endl;
// 创建线程
std::thread t(worker);
// 线程ID
std::cout << "主线程ID: " << std::this_thread::get_id() << std::endl;
std::cout << "子线程ID: " << t.get_id() << std::endl;
// 检查线程是否可join
if (t.joinable()) {
t.join(); // 等待线程完成
}
// 分离线程(主线程不等待)
// t.detach(); // 谨慎使用!
return 0;
}
关键知识点:
hardware_concurrency()返回硬件支持的线程并发数get_id()获取线程唯一标识符joinable()检查线程是否可等待detach()分离线程(慎用)
警告:分离线程(detach)后,主线程无法再控制子线程。如果主线程先退出,可能导致子线程异常终止。
2. 线程同步与数据共享
2.1 竞态条件与数据竞争
cpp复制#include <iostream>
#include <thread>
#include <vector>
// 有数据竞争的错误示例
int counter = 0;
void increment() {
for (int i = 0; i < 100000; ++i) {
counter++; // 非原子操作,存在数据竞争
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
// 结果可能不是200000
std::cout << "计数器值: " << counter << std::endl;
return 0;
}
这个例子展示了典型的数据竞争问题。两个线程同时修改同一个变量counter,由于++操作不是原子的,最终结果可能小于预期的200000。
2.2 互斥锁(Mutex)的使用
cpp复制#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
int counter = 0;
std::mutex mtx;
void safeIncrement() {
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lock(mtx); // 自动加锁解锁
counter++;
// lock_guard离开作用域,自动释放锁
}
}
void tryLockExample() {
std::timed_mutex timed_mtx;
for (int i = 0; i < 5; ++i) {
// 尝试获取锁,非阻塞
if (timed_mtx.try_lock()) {
std::cout << "线程 " << std::this_thread::get_id()
<< " 获取到锁" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
timed_mtx.unlock();
break;
} else {
std::cout << "线程 " << std::this_thread::get_id()
<< " 未能获取锁,等待重试" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
}
int main() {
std::thread t1(safeIncrement);
std::thread t2(safeIncrement);
t1.join();
t2.join();
std::cout << "安全的计数器值: " << counter << std::endl;
// 尝试锁示例
std::thread t3(tryLockExample);
std::thread t4(tryLockExample);
t3.join();
t4.join();
return 0;
}
关键点:
std::lock_guard实现RAII风格的锁管理try_lock()非阻塞尝试获取锁std::timed_mutex支持超时锁定
经验:优先使用
lock_guard而非手动lock()/unlock(),避免忘记释放锁导致死锁。
3. 高级同步机制
3.1 条件变量
cpp复制#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> dataQueue;
bool finished = false;
// 生产者线程
void producer(int items) {
for (int i = 0; i < items; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lock(mtx);
dataQueue.push(i);
std::cout << "生产: " << i << std::endl;
cv.notify_one(); // 通知一个等待的消费者
}
{
std::lock_guard<std::mutex> lock(mtx);
finished = true;
}
cv.notify_all(); // 通知所有消费者
}
// 消费者线程
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 等待条件:队列不为空或生产结束
cv.wait(lock, []{
return !dataQueue.empty() || finished;
});
if (finished && dataQueue.empty()) {
break;
}
if (!dataQueue.empty()) {
int value = dataQueue.front();
dataQueue.pop();
lock.unlock(); // 提前解锁,减少锁持有时间
std::cout << "消费者 " << id << " 消费: " << value << std::endl;
}
}
std::cout << "消费者 " << id << " 结束" << std::endl;
}
int main() {
std::thread prod(producer, 10);
std::thread cons1(consumer, 1);
std::thread cons2(consumer, 2);
prod.join();
cons1.join();
cons2.join();
return 0;
}
条件变量使用要点:
- 必须与互斥锁配合使用
wait()会自动释放锁并在唤醒时重新获取- 使用谓词防止虚假唤醒
notify_one()唤醒一个等待线程,notify_all()唤醒所有
3.2 原子操作
cpp复制#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
std::atomic<int> atomicCounter(0);
std::atomic<bool> ready(false);
void atomicIncrement() {
// 等待信号
while (!ready.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 让出CPU时间片
}
for (int i = 0; i < 100000; ++i) {
atomicCounter.fetch_add(1, std::memory_order_relaxed);
}
}
void testAtomicOperations() {
std::atomic<int> value(10);
// 原子操作示例
int expected = 10;
bool success = value.compare_exchange_strong(expected, 20);
std::cout << "CAS操作: " << (success ? "成功" : "失败")
<< ", 当前值: " << value.load() << std::endl;
// 原子标志测试
std::atomic_flag flag = ATOMIC_FLAG_INIT;
// 测试并设置
bool was_set = flag.test_and_set();
std::cout << "第一次test_and_set: " << was_set << std::endl;
flag.clear();
was_set = flag.test_and_set();
std::cout << "清除后test_and_set: " << was_set << std::endl;
}
int main() {
const int num_threads = 4;
std::vector<std::thread> threads;
// 启动线程
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(atomicIncrement);
}
// 让所有线程开始执行
ready.store(true, std::memory_order_release);
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
std::cout << "原子计数器最终值: " << atomicCounter.load() << std::endl;
// 测试其他原子操作
testAtomicOperations();
return 0;
}
原子操作优势:
- 无需锁即可实现线程安全
- 性能优于互斥锁
- 提供多种内存顺序选项
4. 高级多线程模式
4.1 线程池实现
cpp复制#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop) {
throw std::runtime_error("线程池已停止");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
// 使用示例
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> results;
// 提交任务到线程池
for (int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "任务 " << i << " 在线程 "
<< std::this_thread::get_id() << " 执行" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return i * i;
})
);
}
// 获取结果
for (auto& result : results) {
std::cout << "结果: " << result.get() << std::endl;
}
return 0;
}
线程池关键设计:
- 固定数量的工作线程
- 任务队列管理
- 优雅关闭机制
- 支持返回值的任务提交
4.2 读写锁(C++14及以上)
cpp复制#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>
#include <chrono>
class ThreadSafeData {
private:
mutable std::shared_mutex mutex_;
int data_;
public:
ThreadSafeData(int init = 0) : data_(init) {}
// 读取操作 - 共享锁
int read() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
std::cout << "读取: " << data_ << " 线程ID: "
<< std::this_thread::get_id() << std::endl;
return data_;
}
// 写入操作 - 独占锁
void write(int value) {
std::unique_lock<std::shared_mutex> lock(mutex_);
std::cout << "写入: " << value << " 线程ID: "
<< std::this_thread::get_id() << std::endl;
data_ = value;
}
// 增量操作
void increment() {
std::unique_lock<std::shared_mutex> lock(mutex_);
++data_;
std::cout << "增量到: " << data_ << " 线程ID: "
<< std::this_thread::get_id() << std::endl;
}
};
int main() {
ThreadSafeData data(0);
std::vector<std::thread> threads;
// 创建5个读线程
for (int i = 0; i < 5; ++i) {
threads.emplace_back([&data, i] {
for (int j = 0; j < 3; ++j) {
data.read();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
}
// 创建2个写线程
for (int i = 0; i < 2; ++i) {
threads.emplace_back([&data, i] {
for (int j = 0; j < 2; ++j) {
data.write(i * 10 + j);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
});
}
// 创建1个增量线程
threads.emplace_back([&data] {
for (int i = 0; i < 4; ++i) {
data.increment();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
});
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
std::cout << "最终值: " << data.read() << std::endl;
return 0;
}
读写锁特点:
- 多个读线程可以同时访问
- 写线程独占访问
- 读多写少场景性能优势明显
5. C++20新特性
5.1 信号量(C++20)
cpp复制#include <iostream>
#include <thread>
#include <semaphore>
#include <vector>
// 生产者-消费者模型使用信号量
std::counting_semaphore<10> emptySlots(10); // 空槽位信号量
std::counting_semaphore<10> fullSlots(0); // 满槽位信号量
std::mutex bufferMutex;
std::queue<int> buffer;
bool done = false;
void producer(int id) {
for (int i = 0; i < 5; ++i) {
emptySlots.acquire(); // 等待空槽位
{
std::lock_guard<std::mutex> lock(bufferMutex);
buffer.push(i);
std::cout << "生产者 " << id << " 生产: " << i << std::endl;
}
fullSlots.release(); // 增加满槽位
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(int id) {
while (!done || !buffer.empty()) {
fullSlots.acquire(); // 等待满槽位
if (done && buffer.empty()) {
fullSlots.release();
break;
}
int value;
{
std::lock_guard<std::mutex> lock(bufferMutex);
if (!buffer.empty()) {
value = buffer.front();
buffer.pop();
std::cout << "消费者 " << id << " 消费: " << value << std::endl;
}
}
emptySlots.release(); // 增加空槽位
}
}
int main() {
const int numProducers = 3;
const int numConsumers = 2;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
// 启动生产者
for (int i = 0; i < numProducers; ++i) {
producers.emplace_back(producer, i);
}
// 启动消费者
for (int i = 0; i < numConsumers; ++i) {
consumers.emplace_back(consumer, i);
}
// 等待生产者完成
for (auto& p : producers) {
p.join();
}
// 设置完成标志
done = true;
// 释放所有信号量以唤醒等待的消费者
for (int i = 0; i < numConsumers; ++i) {
fullSlots.release();
}
// 等待消费者完成
for (auto& c : consumers) {
c.join();
}
std::cout << "生产消费完成" << std::endl;
return 0;
}
信号量特点:
- 比条件变量更直观的资源计数
- 适合控制对有限数量资源的访问
- C++20标准化,之前需要平台特定实现
6. 最佳实践与性能优化
6.1 避免死锁的准则
cpp复制#include <iostream>
#include <thread>
#include <mutex>
// 死锁示例
std::mutex mtx1, mtx2;
void deadlockThread1() {
std::lock_guard<std::mutex> lock1(mtx1);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lock2(mtx2); // 可能死锁
std::cout << "线程1完成" << std::endl;
}
void deadlockThread2() {
std::lock_guard<std::mutex> lock2(mtx2);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lock1(mtx1); // 可能死锁
std::cout << "线程2完成" << std::endl;
}
// 避免死锁的方法1:按固定顺序加锁
void safeThread1() {
std::lock(mtx1, mtx2); // 同时锁定多个互斥量
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
std::cout << "安全线程1完成" << std::endl;
}
void safeThread2() {
std::lock(mtx1, mtx2); // 相同的锁定顺序
std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
std::cout << "安全线程2完成" << std::endl;
}
// 避免死锁的方法2:使用std::scoped_lock(C++17)
void scopedLockExample() {
std::scoped_lock lock(mtx1, mtx2); // 自动管理多个锁
std::cout << "使用scoped_lock安全加锁" << std::endl;
}
int main() {
// 可能产生死锁
// std::thread t1(deadlockThread1);
// std::thread t2(deadlockThread2);
// t1.join();
// t2.join();
// 安全版本
std::thread t3(safeThread1);
std::thread t4(safeThread2);
t3.join();
t4.join();
// C++17 scoped_lock
std::thread t5(scopedLockExample);
t5.join();
return 0;
}
避免死锁的方法:
- 总是按相同顺序获取多个锁
- 使用
std::lock同时锁定多个互斥量 - 使用C++17的
std::scoped_lock - 避免在持有锁时调用未知代码
- 使用锁层次结构
6.2 性能优化技巧
cpp复制#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <chrono>
// 伪共享问题示例
struct BadAlignment {
int a; // 可能和b在同一个缓存行
int b;
};
// 解决伪共享
struct alignas(64) GoodAlignment { // 64字节对齐,通常是缓存行大小
int a; // 独占一个缓存行
};
struct alignas(64) GoodAlignment2 {
int b; // 独占另一个缓存行
};
void falseSharingTest() {
const int iterations = 100000000;
// 伪共享情况
BadAlignment bad;
auto start = std::chrono::high_resolution_clock::now();
std::thread t1([&bad, iterations] {
for (int i = 0; i < iterations; ++i) {
bad.a++;
}
});
std::thread t2([&bad, iterations] {
for (int i = 0; i < iterations; ++i) {
bad.b++;
}
});
t1.join();
t2.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "伪共享耗时: " << duration.count() << "ms" << std::endl;
// 避免伪共享
GoodAlignment good1;
GoodAlignment2 good2;
start = std::chrono::high_resolution_clock::now();
std::thread t3([&good1, iterations] {
for (int i = 0; i < iterations; ++i) {
good1.a++;
}
});
std::thread t4([&good2, iterations] {
for (int i = 0; i < iterations; ++i) {
good2.b++;
}
});
t3.join();
t4.join();
end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "避免伪共享耗时: " << duration.count() << "ms" << std::endl;
}
int main() {
falseSharingTest();
return 0;
}
性能优化要点:
- 避免伪共享(False Sharing)
- 减少锁的争用
- 使用无锁数据结构
- 合理设置线程数量
- 使用线程本地存储(TLS)
7. 调试与测试
7.1 线程安全的单元测试
cpp复制#include <iostream>
#include <thread>
#include <vector>
#include <cassert>
#include <future>
class ThreadSafeCounter {
private:
std::mutex mtx;
int count;
public:
ThreadSafeCounter() : count(0) {}
void increment() {
std::lock_guard<std::mutex> lock(mtx);
++count;
}
int get() const {
std::lock_guard<std::mutex> lock(mtx);
return count;
}
};
void testThreadSafety() {
ThreadSafeCounter counter;
const int numThreads = 10;
const int incrementsPerThread = 1000;
std::vector<std::future<void>> futures;
// 启动多个线程同时增加计数器
for (int i = 0; i < numThreads; ++i) {
futures.emplace_back(std::async(std::launch::async,
[&counter, incrementsPerThread] {
for (int j = 0; j < incrementsPerThread; ++j) {
counter.increment();
}
}
));
}
// 等待所有线程完成
for (auto& future : futures) {
future.wait();
}
// 验证结果
int expected = numThreads * incrementsPerThread;
int actual = counter.get();
std::cout << "期望值: " << expected << std::endl;
std::cout << "实际值: " << actual << std::endl;
assert(actual == expected);
std::cout << "测试通过!" << std::endl;
}
int main() {
try {
testThreadSafety();
} catch (const std::exception& e) {
std::cerr << "测试失败: " << e.what() << std::endl;
return 1;
}
return 0;
}
多线程测试要点:
- 设计并发测试场景
- 验证线程安全性
- 检查资源竞争
- 验证正确性和性能
- 使用断言验证预期结果
8. 实际开发经验分享
在多线程开发实践中,我总结了以下几点经验:
-
锁的粒度:锁的粒度要尽可能小,但也要保证原子性。过大的锁范围会降低并发性能,过小的锁范围可能导致数据不一致。
-
异常安全:确保在异常情况下锁能被正确释放。使用RAII风格的锁管理类(如
lock_guard)可以避免忘记释放锁。 -
性能分析:使用性能分析工具(如perf、VTune)识别热点和锁争用。我曾遇到一个案例,通过减少锁争用将性能提升了3倍。
-
调试技巧:
- 给线程命名(平台相关)方便调试
- 使用条件变量时添加超时防止死锁
- 记录锁的获取顺序帮助诊断死锁
-
资源管理:
- 线程是昂贵的资源,避免频繁创建销毁
- 使用线程池管理工作线程
- 注意线程局部存储(TLS)的使用
-
现代C++特性:
- 优先使用
std::async而非直接创建线程 - 使用
std::future获取异步结果 - C++20的协程可以简化某些异步模式
- 优先使用
多线程编程既是一门科学也是一门艺术。掌握基本原理后,真正的挑战在于如何在保证正确性的前提下最大化性能。每个多线程程序都有其独特的特点,需要根据具体场景选择合适的设计模式和同步机制。