1. CPU 缓存架构与缓存一致性
1.1 为什么需要 CPU 缓存?
现代 CPU 的运算速度已经达到了惊人的 GHz 级别,而内存访问速度却远远跟不上这个节奏。想象一下,如果每次 CPU 需要数据都要跑到内存那里去取,就像让博尔特每次跑步都要从家里出发一样,效率会低得可怕。这就是为什么现代 CPU 都采用了多级缓存架构(L1/L2/L3)。
L1 缓存最小但最快,通常只有几十KB,访问延迟在1ns左右;L2 缓存稍大一些,可能有几百KB,延迟在3-5ns;L3 缓存更大,可能有几十MB,延迟在10-20ns。相比之下,访问主内存的延迟通常在50-100ns。这种层级设计就像是我们在办公桌上放最常用的文件(L1),抽屉里放次常用的(L2),书柜里放不太常用的(L3),而档案室就是主内存了。
1.2 写回策略与缓存不一致问题
CPU 缓存使用写回(Write-back)策略来提高性能。这意味着当 CPU 修改了缓存中的数据时,并不会立即将这些修改写回主内存,而是等到这个缓存行需要被替换出去时,才把修改过的数据写回内存。这就像我们做笔记时,先在草稿纸上修改,等整页都改完了再誊写到正式本子上。
但这种策略带来了缓存一致性问题。假设有两个CPU核心A和B,它们都缓存了同一个内存地址的数据。如果核心A修改了这个数据但还没写回内存,核心B看到的就还是旧值。这就好比两个同事在协作编辑同一个文档,A修改了但没保存,B看到的还是旧版本,这就出问题了。
1.3 缓存一致性协议:MESI 与总线嗅探
为了解决这个问题,硬件工程师们发明了缓存一致性协议,其中最著名的就是MESI协议。MESI是四个状态的缩写:
- Modified(已修改):这个缓存行中的数据已经被修改,与内存不一致,而且只有当前缓存有这个数据。
- Exclusive(独占):数据与内存一致,而且只有当前缓存有这个数据。
- Shared(共享):数据与内存一致,而且可能存在于多个缓存中。
- Invalid(无效):这个缓存行中的数据无效,不能使用。
总线嗅探机制是MESI协议的核心。每个CPU核心都会监听总线上的事务,就像会议室里每个人都竖起耳朵听别人在说什么。当某个核心要修改数据时,它会先通过总线告诉其他核心:"我要改这个数据了,你们都把对应的缓存行置为Invalid"。这样就能保证数据的一致性。
2. 原子操作:不可分割的执行单元
2.1 什么是原子操作?
原子操作就像是不可分割的"量子"操作 - 要么完全执行,要么完全不执行,没有中间状态。想象你在转账:扣款和入账必须同时完成,不能只扣款不入账,或者反过来。这就是原子性。
在计算机中,像简单的读取或写入一个对齐的字(word)通常是原子的,但像i++这样的操作实际上包含读取、修改、写入三个步骤,在多线程环境下就可能出问题。
2.2 为什么需要原子操作?
考虑一个简单的计数器场景:两个线程同时执行counter++。在汇编层面,这实际上是:
- 从内存读取counter到寄存器
- 寄存器加1
- 将寄存器值写回内存
如果没有原子性保证,两个线程可能交错执行这些步骤,导致最终结果不正确。比如两个线程都读取到counter=5,都加1后写回6,而实际上应该增加到7。
2.3 硬件如何支持原子操作?
现代CPU提供了专门的指令来支持原子操作。比如x86架构的LOCK前缀可以锁定总线,确保指令执行期间其他核心不能访问相同的内存地址。还有像CMPXCHG(比较并交换)这样的原子指令,直接由硬件保证其原子性。
在C++中,我们可以使用std::atomic模板来利用这些硬件特性。编译器会为不同的平台生成最优的原子操作指令。
3. Compare-And-Swap(CAS)原理与应用
3.1 CAS 定义
CAS是并发编程中的瑞士军刀,它的伪代码逻辑很简单:
cpp复制bool CAS(T* ptr, T expected, T desired) {
if (*ptr == expected) {
*ptr = desired;
return true;
}
return false;
}
但关键在于整个操作是原子的 - 比较和交换这两个步骤不会被其他线程打断。
3.2 C++ 中的 CAS 接口
C++11提供了两种CAS操作:
cpp复制std::atomic<int> val;
int expected = 0;
bool success = val.compare_exchange_strong(expected, 1);
compare_exchange_strong保证严格的CAS语义,而compare_exchange_weak允许虚假失败(在某些平台上性能更好),通常用在循环中:
cpp复制do {
expected = val.load();
desired = f(expected);
} while (!val.compare_exchange_weak(expected, desired));
3.3 CAS 的 ABA 问题及解决
ABA问题是CAS的一个经典陷阱:线程1读取变量值为A,线程2把A改成B又改回A,然后线程1执行CAS发现值还是A,就以为没变过。
解决方案主要有两种:
- 使用带版本号的指针,每次修改都递增版本号
- 使用双重CAS,同时比较值和版本号
在C++中,可以用std::atomicstd::shared_ptr来避免这个问题,因为shared_ptr的引用计数机制天然防止了ABA问题。
4. 内存序:从指令重排到可见性
4.1 为什么会有内存序问题?
现代编译器和CPU为了优化性能,会对指令进行重排序。只要不改变单线程的执行结果,这种重排是完全合法的。但在多线程环境下,这可能导致其他线程看到"不可能"的执行顺序。
比如:
cpp复制// 线程1
x = 1;
ready = true;
// 线程2
while (!ready);
assert(x == 1);
如果没有正确的内存序,线程2可能会看到ready为true但x还是0的情况。
4.2 C++ 内存模型与六种内存序
C++11定义了6种内存序:
- memory_order_relaxed:只保证原子性,不保证顺序
- memory_order_consume:已弃用,不建议使用
- memory_order_acquire:保证之后的读操作不会被重排到前面
- memory_order_release:保证之前的写操作不会被重排到后面
- memory_order_acq_rel:acquire + release
- memory_order_seq_cst:完全顺序一致,性能开销最大
最常用的模式是release-acquire配对:
cpp复制// 线程1
x.store(1, std::memory_order_release);
// 线程2
x.load(std::memory_order_acquire);
这样能确保线程1在release之前的所有写操作,对线程2在acquire之后都是可见的。
5. 锁的实现:从自旋锁到互斥锁
5.1 自旋锁(Spinlock)
自旋锁是最简单的锁实现,它通过忙等待(busy-waiting)来获取锁:
cpp复制class Spinlock {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock() {
while (flag.test_and_set(std::memory_order_acquire)) {
// 自旋等待
}
}
void unlock() {
flag.clear(std::memory_order_release);
}
};
自旋锁适用于锁持有时间很短的场景,因为忙等待会消耗CPU资源。
5.2 互斥锁(Mutex)
操作系统提供的互斥锁更智能,当获取不到锁时会让线程休眠:
cpp复制class Mutex {
std::atomic<int> state{0}; // 0=unlocked, 1=locked
public:
void lock() {
if (state.exchange(1, std::memory_order_acquire)) {
// 锁已被占用,进入内核等待
syscall(SYS_futex, &state, FUTEX_WAIT, 1, nullptr, nullptr, 0);
}
}
void unlock() {
state.store(0, std::memory_order_release);
// 唤醒等待的线程
syscall(SYS_futex, &state, FUTEX_WAKE, 1, nullptr, nullptr, 0);
}
};
互斥锁适合锁持有时间较长的场景,因为它不会浪费CPU周期在忙等待上。
5.3 锁的代价与无锁编程
锁虽然好用,但也有代价:
- 阻塞会导致上下文切换开销
- 可能引发死锁、优先级反转等问题
- 降低并行度
因此在高性能场景下,无锁(lock-free)数据结构往往更受欢迎。无锁编程直接使用原子操作来协调线程,避免了锁的开销。但无锁算法设计起来更复杂,需要仔细处理内存序和ABA问题。
6. 案例:手撕线程安全的 shared_ptr
让我们实现一个简化版的线程安全shared_ptr:
cpp复制template <typename T>
class SharedPtr {
T* ptr;
std::atomic<size_t>* ref_count;
public:
// 构造函数
explicit SharedPtr(T* p = nullptr) : ptr(p), ref_count(p ? new std::atomic<size_t>(1) : nullptr) {}
// 拷贝构造
SharedPtr(const SharedPtr& other) : ptr(other.ptr), ref_count(other.ref_count) {
if (ref_count) ref_count->fetch_add(1, std::memory_order_relaxed);
}
// 析构函数
~SharedPtr() {
if (ref_count && ref_count->fetch_sub(1, std::memory_order_acq_rel) == 1) {
delete ptr;
delete ref_count;
}
}
// 其他必要方法...
};
关键点:
- 引用计数使用std::atomic<size_t>
- 拷贝构造使用memory_order_relaxed,因为只需要原子递增
- 析构使用memory_order_acq_rel,确保删除操作能看到所有之前的写操作
7. 性能优化与实战建议
在实际开发中,使用原子操作和锁时有几个重要建议:
-
测量优先:不要过早优化,先用简单的锁实现,通过性能分析找到真正的瓶颈。
-
选择合适的同步原语:
- 短临界区:考虑自旋锁或无锁算法
- 长临界区:使用互斥锁
- 读多写少:考虑读写锁
-
减少共享数据:
- 尽量使用线程本地存储(TLS)
- 使用不可变(immutable)数据结构
- 通过消息传递而非共享内存
-
内存序选择:
- 默认使用memory_order_seq_cst
- 只有在性能关键路径且完全理解后果时才使用更宽松的内存序
-
避免虚假共享:
- 将频繁写入的原子变量放在不同的缓存行
- 使用alignas(CACHE_LINE_SIZE)或编译器特定的属性
-
工具辅助:
- 使用ThreadSanitizer检测数据竞争
- 使用性能分析工具检测锁争用
8. 常见陷阱与调试技巧
在多线程编程中,有一些常见陷阱需要注意:
-
死锁:多个锁以不同顺序获取可能导致死锁。解决方案:
- 总是以固定顺序获取锁
- 使用std::lock或std::scoped_lock同时获取多个锁
-
活锁:线程不断重试但无法进展。比如两个线程同时检测到冲突并回退,然后又同时重试。解决方案:
- 引入随机退避
- 使用队列化请求
-
优先级反转:高优先级线程等待低优先级线程持有的锁。解决方案:
- 使用优先级继承协议
- 避免混合不同优先级的线程访问同一锁
调试技巧:
- 添加详细的日志,记录线程ID和时间戳
- 使用条件变量时总是检查谓词,避免虚假唤醒
- 对于难以复现的问题,考虑记录执行轨迹后离线分析
9. 现代C++中的并发工具
除了原子操作和锁,现代C++还提供了其他有用的并发工具:
- std::mutex和std::lock_guard:基本的互斥锁
cpp复制std::mutex mtx;
{
std::lock_guard<std::mutex> lock(mtx);
// 临界区
}
- std::shared_mutex:读写锁,允许多个读者或单个写者
cpp复制std::shared_mutex rw_lock;
// 读锁
{
std::shared_lock<std::shared_mutex> lock(rw_lock);
// 读取数据
}
// 写锁
{
std::unique_lock<std::shared_mutex> lock(rw_lock);
// 修改数据
}
- std::condition_variable:线程间通知机制
cpp复制std::condition_variable cv;
std::mutex cv_mtx;
bool ready = false;
// 等待线程
std::unique_lock<std::mutex> lock(cv_mtx);
cv.wait(lock, []{return ready;});
// 通知线程
{
std::lock_guard<std::mutex> lock(cv_mtx);
ready = true;
}
cv.notify_one();
- std::future和std::async:异步任务
cpp复制auto future = std::async(std::launch::async, []{
return compute_something();
});
// 其他工作...
auto result = future.get();
10. 无锁数据结构示例
最后,我们来看一个简单的无锁栈实现,展示如何正确使用原子操作:
cpp复制template <typename T>
class LockFreeStack {
struct Node {
T data;
Node* next;
};
std::atomic<Node*> head{nullptr};
public:
void push(const T& data) {
Node* new_node = new Node{data, nullptr};
new_node->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed)) {
// CAS失败,重试
}
}
bool pop(T& result) {
Node* old_head = head.load(std::memory_order_acquire);
while (old_head &&
!head.compare_exchange_weak(old_head, old_head->next,
std::memory_order_release,
std::memory_order_acquire)) {
// CAS失败,重试
}
if (!old_head) return false;
result = old_head->data;
delete old_head;
return true;
}
};
这个实现展示了无锁编程的几个关键点:
- 使用CAS循环处理并发修改
- 精心选择内存序:push使用release,pop使用acquire-release
- 需要处理内存回收问题(这里简单delete,实际可能需要更复杂的方案)
在实际项目中,无锁数据结构的设计要复杂得多,需要考虑ABA问题、内存回收、阻塞操作等。建议优先使用经过充分测试的库实现,如Boost.Lockfree或TBB中的并发容器。