在C++并发编程中,直接使用std::thread创建线程会遇到几个典型问题:首先,线程创建和销毁的开销很大,特别是在需要频繁执行短任务的场景下;其次,无限制地创建线程会导致系统资源耗尽;最后,缺乏统一的任务调度机制会使代码难以维护。
我曾在实际项目中遇到过这样的场景:一个网络服务需要处理大量短时请求,最初采用每个请求创建一个线程的方式,结果在并发量达到2000时系统直接崩溃。后来改用线程池方案,用4个核心线程就稳定支撑了5000+的QPS。
固定数量worker线程是最常见的选择,主要考虑:
cpp复制std::vector<std::thread> workers_;
for(size_t i=0; i<thread_count; ++i){
workers_.emplace_back([this]{ worker_loop(); });
}
任务队列需要满足:
cpp复制std::queue<std::function<void()>> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
关键点:任务必须存储为void()可调用对象,这样才能统一处理各种函数类型
通过std::packaged_task将任意可调用对象包装为异步任务:
cpp复制auto task = std::make_shared<std::packaged_task<int()>>(
[]{ return 42; }
);
std::future<int> future = task->get_future();
当任务抛出异常时,future.get()会重新抛出:
cpp复制try {
auto result = future.get();
} catch(const std::exception& e) {
// 处理任务抛出的异常
}
| 策略类型 | 行为表现 | 适用场景 |
|---|---|---|
| Abort | 直接抛出异常 | 需要快速失败 |
| CallerRuns | 提交线程自己执行 | 保证任务不丢失 |
| Discard | 静默丢弃任务 | 允许丢任务 |
| Block | 阻塞等待队列空间 | 要求强一致性 |
在金融交易系统中,我推荐使用CallerRuns策略,因为:
cpp复制case RejectPolicy::CallerRuns:
lock.unlock();
(*task)(); // 当前线程执行
return future;
cpp复制enum class State {
Running, // 正常运行
Stopping, // 开始关闭
Stopped // 完全停止
};
std::atomic<State> state_;
cpp复制void shutdown() {
state_ = State::Stopping;
cv_.notify_all(); // 唤醒所有等待线程
for(auto& t : workers_) {
if(t.joinable()) t.join();
}
}
任务执行必须放在锁外:
cpp复制{
std::unique_lock<std::mutex> lock(mtx_);
task = std::move(tasks_.front());
tasks_.pop();
} // 锁作用域结束
task(); // 在锁外执行
必须使用predicate wait:
cpp复制cv_.wait(lock, [this]{
return !tasks_.empty() || state_ != State::Running;
});
cpp复制class ThreadPool {
public:
ThreadPool(size_t thread_count,
size_t max_queue_size,
RejectPolicy policy = RejectPolicy::Abort)
: state_(State::Running),
max_queue_size_(max_queue_size),
reject_policy_(policy)
{
// 创建工作线程
for(size_t i=0; i<thread_count; ++i){
workers_.emplace_back([this]{ worker_loop(); });
}
}
~ThreadPool() { shutdown(); }
// ... 其他成员函数
};
cpp复制void worker_loop() {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]{
return !tasks_.empty() || state_ != State::Running;
});
if(tasks_.empty() && state_ != State::Running){
return; // 关闭时退出
}
task = std::move(tasks_.front());
tasks_.pop();
not_full_cv_.notify_one(); // 通知生产者
}
task(); // 执行任务
}
}
建议添加以下统计:
cpp复制std::atomic<size_t> completed_tasks_{0};
std::atomic<size_t> rejected_tasks_{0};
// 在任务执行后
completed_tasks_++;
// 在拒绝任务时
rejected_tasks_++;
根据负载自动增减线程:
cpp复制void adjust_threads(size_t new_count) {
if(new_count > workers_.size()) {
// 增加线程
} else {
// 减少线程
}
}
改用priority_queue:
cpp复制struct Task {
int priority;
std::function<void()> func;
bool operator<(const Task& other) const {
return priority < other.priority;
}
};
std::priority_queue<Task> tasks_;
在实现这个线程池的过程中,最让我印象深刻的是条件变量的使用技巧。最初版本因为没有正确处理虚假唤醒,导致在高并发场景下会出现任务丢失。后来通过predicate wait模式彻底解决了这个问题,这也让我深刻理解了多线程编程中"永远不要相信单一状态"的原则。