记得我第一次写多线程程序时,遇到一个诡异的bug:计数器结果总是不对。明明两个线程各加100000次,结果却不是200000。这就是典型的竞态条件问题,也是互斥量要解决的核心问题。
在多线程环境中,像counter++这样的简单操作实际上会被拆解为多个机器指令。假设两个线程同时执行这个操作,可能会发生:
最终结果应该是2,但实际只增加了1。这就是更新丢失问题,互斥量的出现正是为了解决这类问题。
实现互斥量的关键在于原子操作。普通的bool变量无法保证"检查-修改"操作的原子性,这就是为什么我们需要std::atomic。
cpp复制std::atomic<bool> m_locked(false);
这个原子变量就是我们锁的核心状态标志。false表示锁可用,true表示锁被占用。
compare_exchange_weak是原子操作中的瑞士军刀,它实现了这样的逻辑:
cpp复制bool compare_exchange_weak(T& expected, T desired) {
if (*this == expected) {
*this = desired;
return true;
}
expected = *this;
return false;
}
这个操作是原子的,不会被其他线程打断。在我们的锁实现中:
cpp复制bool expected = false;
if (m_locked.compare_exchange_weak(
expected,
true,
std::memory_order_acquire)) {
return; // 获取锁成功
}
你可能注意到代码中使用了memory_order_acquire和memory_order_release。这不是可有可无的装饰,而是保证内存可见性的关键。
acquire语义:保证后续的读操作不会重排到这次原子操作之前release语义:保证前面的写操作不会重排到这次原子操作之后这样组合使用,就形成了同步关系,确保临界区内的修改对其他线程可见。
让我们再看一下完整的SimpleMutex类:
cpp复制class SimpleMutex {
public:
SimpleMutex() : m_locked(false) {}
void lock() {
while (true) {
bool expected = false;
if (m_locked.compare_exchange_weak(
expected,
true,
std::memory_order_acquire)) {
return;
}
}
}
void unlock() {
m_locked.store(false, std::memory_order_release);
}
private:
std::atomic<bool> m_locked;
};
这个实现虽然简单,但包含了互斥量的所有核心要素:
配套的测试代码展示了如何使用这个锁:
cpp复制SimpleMutex g_mutex;
int g_value = 0;
void ThreadFunc() {
for (int i = 0; i < 100000; ++i) {
g_mutex.lock();
++g_value;
g_mutex.unlock();
}
}
int main() {
std::thread t1(ThreadFunc);
std::thread t2(ThreadFunc);
t1.join();
t2.join();
std::cout << "Final value = " << g_value << std::endl;
return 0;
}
这个测试创建两个线程,每个都对共享变量g_value进行10万次加1操作。如果没有锁保护,结果通常会小于20万;有了锁保护,结果总是精确的20万。
我们实现的这种锁称为自旋锁,它的特点是:
优点:
缺点:
cpp复制while (...) {
if (!m_locked.compare_exchange_weak(...)) {
std::this_thread::yield();
continue;
}
...
}
cpp复制int spin_count = 0;
while (...) {
if (!m_locked.compare_exchange_weak(...)) {
if (++spin_count > 10) {
std::this_thread::sleep_for(
std::chrono::microseconds(1 << (spin_count-10)));
}
continue;
}
...
}
cpp复制bool try_lock() {
bool expected = false;
return m_locked.compare_exchange_weak(
expected,
true,
std::memory_order_acquire);
}
锁的粒度是指锁保护的代码范围大小。太粗的粒度(锁住大段代码)会降低并发性;太细的粒度(太多小锁)会增加复杂度并可能引发死锁。
经验法则:
手动调用lock/unlock容易忘记解锁,更安全的做法是使用RAII包装器:
cpp复制class LockGuard {
public:
explicit LockGuard(SimpleMutex& m) : mutex(m) { mutex.lock(); }
~LockGuard() { mutex.unlock(); }
LockGuard(const LockGuard&) = delete;
LockGuard& operator=(const LockGuard&) = delete;
private:
SimpleMutex& mutex;
};
// 使用示例
{
LockGuard lock(g_mutex); // 自动加锁
// 临界区代码
} // 自动解锁
死锁的四个必要条件:
预防策略:
读写锁允许多个读或单个写,适用于读多写少的场景:
cpp复制class ReadWriteLock {
public:
void read_lock() {
mutex.lock();
while (writer) {
mutex.unlock();
std::this_thread::yield();
mutex.lock();
}
++readers;
mutex.unlock();
}
void write_lock() {
mutex.lock();
while (readers > 0 || writer) {
mutex.unlock();
std::this_thread::yield();
mutex.lock();
}
writer = true;
mutex.unlock();
}
// 解锁实现类似
private:
SimpleMutex mutex;
int readers = 0;
bool writer = false;
};
条件变量允许线程等待特定条件成立:
cpp复制class ConditionVariable {
public:
void wait(SimpleMutex& mutex) {
std::unique_lock<std::mutex> lk(internal_mutex);
mutex.unlock();
cv.wait(lk);
mutex.lock();
}
void notify_one() { cv.notify_one(); }
void notify_all() { cv.notify_all(); }
private:
std::mutex internal_mutex;
std::condition_variable cv;
};
在某些场景下,无锁数据结构可能是更好的选择:
cpp复制template<typename T>
class LockFreeStack {
public:
void push(const T& value) {
Node* new_node = new Node(value);
new_node->next = head.load();
while (!head.compare_exchange_weak(
new_node->next,
new_node));
}
bool pop(T& value) {
Node* old_head = head.load();
while (old_head &&
!head.compare_exchange_weak(
old_head,
old_head->next));
if (!old_head) return false;
value = old_head->value;
delete old_head;
return true;
}
private:
struct Node {
T value;
Node* next;
Node(const T& v) : value(v), next(nullptr) {}
};
std::atomic<Node*> head{nullptr};
};
普通bool变量的读写不是原子的,两个线程可能同时看到锁可用状态,导致多个线程进入临界区。
适合:
不适合:
锁 convoy 是指多个线程频繁竞争同一个锁,导致性能下降。解决方法:
好的锁命名能提高代码可读性:
死锁检测:
性能分析:
调试输出:
cpp复制#define LOCK_DEBUG 1
void lock() {
#if LOCK_DEBUG
std::cout << std::this_thread::get_id()
<< " trying to lock" << std::endl;
#endif
// ... 正常lock实现
}
内存序语义在不同架构上可能有差异
自旋等待在不同CPU上效率不同
考虑缓存行对齐减少false sharing:
cpp复制alignas(64) std::atomic<bool> m_locked;
ARM等弱内存模型平台需要更谨慎的内存序使用
我们的SimpleMutex是教学实现,生产环境中应考虑:
更高效的实现:
更丰富的接口:
系统集成:
性能优化:
虽然理解原始锁实现很重要,但在实际项目中可以考虑:
std::unique_lock/std::lock_guard:
cpp复制std::mutex m;
{
std::lock_guard<std::mutex> lock(m);
// 临界区
} // 自动解锁
std::shared_mutex(C++17):
cpp复制std::shared_mutex sm;
// 读锁
{
std::shared_lock lock(sm);
// 多个读线程可以同时进入
}
// 写锁
{
std::unique_lock lock(sm);
// 只有一个写线程可以进入
}
std::scoped_lock(C++17)多锁安全获取:
cpp复制std::mutex m1, m2;
{
std::scoped_lock lock(m1, m2); // 自动避免死锁顺序
// 临界区
}
原子变量直接使用:
cpp复制std::atomic<int> counter{0};
counter.fetch_add(1, std::memory_order_relaxed);
理解这些高级抽象背后的原理,正是我们学习基础实现的价值所在。当你真正掌握了互斥量的底层机制,使用这些高级工具时就能更加得心应手,也能在出现问题时更好地调试和优化。