现代C++多线程编程始于C++11标准,它彻底改变了以往依赖平台特定API(如POSIX线程或Windows线程API)的局面。作为一名长期使用C++进行并发开发的工程师,我见证了这一转变带来的巨大便利。让我们从最基础的部分开始,逐步深入多线程开发的各个层面。
创建线程在C++11中变得异常简单。std::thread类的构造函数接受一个可调用对象(函数、lambda表达式或函数对象)作为参数。下面是一个最基本的示例:
cpp复制#include <iostream>
#include <thread>
void helloFunction() {
std::cout << "Hello from thread!" << std::endl;
}
int main() {
std::thread t(helloFunction); // 创建并启动线程
t.join(); // 等待线程结束
return 0;
}
在实际项目中,我通常会遵循以下几个线程管理原则:
重要提示:忘记join()或detach()会导致std::terminate被调用,程序异常终止。这是新手最常见的错误之一。
每个线程都有唯一的标识符,可以通过std::this_thread::get_id()获取。了解硬件支持的并发线程数对合理设计线程数量很重要:
cpp复制#include <thread>
#include <iostream>
int main() {
unsigned int cores = std::thread::hardware_concurrency();
std::cout << "This system has " << cores
<< " hardware threads available." << std::endl;
std::thread::id main_id = std::this_thread::get_id();
std::cout << "Main thread ID: " << main_id << std::endl;
return 0;
}
根据我的经验,线程数量通常设置为硬件并发数+1是个不错的起点,但最佳数量需要通过基准测试确定。
向线程传递参数看似简单,实则暗藏玄机。C++线程的参数传递本质上是按值传递的,即使你指定了引用:
cpp复制void modify(int& x) { x = 42; }
int main() {
int value = 0;
std::thread t(modify, value); // 看似传递引用,实际是值传递
t.join();
std::cout << value << std::endl; // 输出0,未被修改
}
正确传递引用需要使用std::ref包装器:
cpp复制std::thread t(modify, std::ref(value)); // 真正传递引用
此外,传递指针时要特别注意生命周期问题。我曾在一个项目中遇到线程访问已释放内存的bug,就是因为传递了局部变量的指针。
当多个线程访问共享资源时,同步问题就变得至关重要。C++提供了多种同步原语,各有适用场景。
互斥锁是最基础的同步机制。C++11提供了多种mutex类型:
std::mutex:基本互斥锁std::recursive_mutex:可重入互斥锁std::timed_mutex:带超时功能的互斥锁std::recursive_timed_mutex:可重入且带超时使用std::lock_guard可以自动管理锁的生命周期:
cpp复制#include <mutex>
#include <thread>
std::mutex mtx;
int shared_data = 0;
void safe_increment() {
std::lock_guard<std::mutex> lock(mtx);
++shared_data; // 临界区操作
}
int main() {
std::thread t1(safe_increment);
std::thread t2(safe_increment);
t1.join(); t2.join();
std::cout << shared_data << std::endl; // 正确输出2
}
在实际项目中,我总结出几个mutex使用原则:
条件变量(std::condition_variable)用于线程间的通知机制,常与互斥锁配合使用。典型的生产者-消费者模式实现:
cpp复制#include <queue>
#include <mutex>
#include <condition_variable>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
void producer() {
for(int i=0; i<10; ++i) {
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
cv.notify_one(); // 通知消费者
}
}
void consumer() {
while(true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return !data_queue.empty(); });
int value = data_queue.front();
data_queue.pop();
lock.unlock();
if(value == 9) break; // 结束条件
std::cout << value << std::endl;
}
}
条件变量使用时容易犯的错误:
wait会自动释放锁,唤醒后重新获取对于简单的计数器等场景,原子操作(std::atomic)比互斥锁更高效:
cpp复制#include <atomic>
#include <thread>
std::atomic<int> counter(0);
void increment() {
for(int i=0; i<100000; ++i) {
counter.fetch_add(1, std::memory_order_relaxed);
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join(); t2.join();
std::cout << counter << std::endl; // 正确输出200000
}
原子操作的内存顺序参数很重要:
memory_order_relaxed:只保证原子性,不保证顺序memory_order_acquire/release:实现获取-释放语义memory_order_seq_cst:最严格的顺序一致性(默认)在我的性能测试中,合理使用relaxed内存顺序可以带来2-3倍的性能提升,但会增加代码复杂度。
掌握了基础同步机制后,我们可以构建更复杂、更高效的多线程模式。
线程池是避免频繁创建销毁线程开销的经典模式。以下是简化实现:
cpp复制#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t threads) : stop(false) {
for(size_t i=0; i<threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if(stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop) throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
线程池使用技巧:
C++14引入了std::shared_mutex,实现读写锁模式:
cpp复制#include <shared_mutex>
#include <map>
class ThreadSafeDictionary {
public:
std::string lookup(const std::string& key) const {
std::shared_lock<std::shared_mutex> lock(mutex);
auto it = data.find(key);
return it != data.end() ? it->second : "";
}
void update(const std::string& key, const std::string& value) {
std::unique_lock<std::shared_mutex> lock(mutex);
data[key] = value;
}
private:
mutable std::shared_mutex mutex;
std::map<std::string, std::string> data;
};
读写锁适用场景:
伪共享是性能的隐形杀手,它发生在多个线程频繁修改位于同一缓存行的不同变量时:
cpp复制struct BadStructure {
int a; // 线程1频繁修改
int b; // 线程2频繁修改
// 可能位于同一缓存行
};
struct GoodStructure {
alignas(64) int a; // 独占缓存行
alignas(64) int b; // 独占另一个缓存行
};
在我的性能测试中,解决伪共享后性能提升可达5-10倍。检测伪共享可以使用perf等工具观察缓存未命中率。
C++20引入了一些强大的新特性,进一步简化了并发编程。
C++20标准库终于加入了信号量:
cpp复制#include <semaphore>
#include <thread>
std::counting_semaphore<10> sem(3); // 最大计数10,初始3
void worker(int id) {
sem.acquire();
std::cout << "Thread " << id << " working..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
sem.release();
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
std::thread t3(worker, 3);
std::thread t4(worker, 4);
t1.join(); t2.join(); t3.join(); t4.join();
}
信号量适合控制对有限资源的访问,如数据库连接池。
C++20协程为异步编程提供了新范式:
cpp复制#include <coroutine>
#include <iostream>
struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
Task myCoroutine() {
std::cout << "Coroutine started" << std::endl;
co_await std::suspend_always{};
std::cout << "Coroutine resumed" << std::endl;
}
虽然协程标准库支持还不完善,但它代表了异步编程的未来方向。
根据我多年的多线程开发经验,总结出以下最佳实践:
std::async、线程池,而非直接操作线程多线程调试技巧:
让我们综合运用所学知识,实现一个高性能的并发队列。
cpp复制#include <memory>
#include <mutex>
template<typename T>
class ConcurrentQueue {
private:
struct Node {
std::shared_ptr<T> data;
std::unique_ptr<Node> next;
};
std::unique_ptr<Node> head;
Node* tail;
std::mutex head_mutex;
std::mutex tail_mutex;
std::condition_variable data_cond;
Node* get_tail() {
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<Node> pop_head() {
std::unique_ptr<Node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
public:
ConcurrentQueue() : head(new Node), tail(head.get()) {}
void push(T new_value) {
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<Node> p(new Node);
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
Node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
data_cond.notify_one();
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get() == get_tail()) {
return std::shared_ptr<T>();
}
return pop_head()->data;
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock, [this] {
return head.get() != get_tail();
});
return pop_head()->data;
}
};
这个实现的特点:
对于极致性能场景,可以考虑无锁队列。以下是基于原子操作的简化版:
cpp复制#include <atomic>
template<typename T>
class LockFreeQueue {
private:
struct Node {
std::shared_ptr<T> data;
std::atomic<Node*> next;
Node() : next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
Node* dummy = new Node;
head.store(dummy);
tail.store(dummy);
}
void push(T new_value) {
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
Node* new_node = new Node;
Node* old_tail = tail.load();
while(true) {
Node* next = old_tail->next.load();
if(!next) {
if(old_tail->next.compare_exchange_weak(next, new_node)) {
tail.compare_exchange_weak(old_tail, new_node);
old_tail->data = new_data;
return;
}
} else {
tail.compare_exchange_weak(old_tail, next);
}
}
}
std::shared_ptr<T> pop() {
Node* old_head = head.load();
while(true) {
Node* next = old_head->next.load();
if(!next) return std::shared_ptr<T>();
if(head.compare_exchange_weak(old_head, next)) {
std::shared_ptr<T> res = next->data;
delete old_head;
return res;
}
}
}
};
无锁编程的挑战:
在实际项目中,我通常先使用有锁实现,确认性能瓶颈后再考虑无锁方案。
多线程程序的调试和测试是极具挑战性的工作,需要特殊的方法和工具。
Thread Sanitizer (TSAN):
GCC和Clang提供的强大数据竞争检测工具。编译时添加-fsanitize=thread选项即可启用。
死锁检测技术:
日志记录技巧:
多线程单元测试需要特殊考虑:
cpp复制#include <gtest/gtest.h>
#include <thread>
#include <vector>
TEST(ThreadSafeStackTest, ConcurrentPushPop) {
ThreadSafeStack<int> stack;
const int kThreadCount = 10;
const int kIterations = 1000;
auto pusher = [&stack] {
for(int i=0; i<kIterations; ++i) {
stack.push(i);
}
};
auto popper = [&stack] {
for(int i=0; i<kIterations; ++i) {
try {
stack.pop();
} catch(...) {}
}
};
std::vector<std::thread> threads;
for(int i=0; i<kThreadCount; ++i) {
threads.emplace_back(i%2 ? pusher : popper);
}
for(auto& t : threads) {
t.join();
}
EXPECT_GE(stack.size(), 0);
}
测试要点:
C++标准委员会仍在不断改进多线程支持,了解这些趋势有助于我们提前准备。
C++17引入了并行算法,C++23将进一步扩展:
cpp复制#include <algorithm>
#include <execution>
#include <vector>
void parallelSort() {
std::vector<int> data = {...};
std::sort(std::execution::par, data.begin(), data.end());
}
随着GPU和专用加速器的普及,C++也在增强异构计算支持:
经过多年多线程项目开发,我总结出一些宝贵的经验教训:
常见陷阱及规避方法:
最后,记住Donald Knuth的名言:"过早优化是万恶之源"。在多线程编程中尤其如此,先确保正确性,再考虑优化性能。