1. 现代C++并行编程的可靠性挑战
在当今多核处理器普及的时代,并行计算已成为提升程序性能的关键手段。C++作为系统级编程语言,通过标准库不断强化对并行编程的支持。std::ranges自C++20引入后,为数据操作提供了声明式的函数式编程接口,而并行执行策略则进一步释放了硬件性能潜力。然而,当我们将算法并行化时,异常处理和资源管理变得异常复杂。
想象一下这样的场景:你正在使用并行算法处理一个包含百万条记录的数据集,突然某个工作线程因为数据格式问题抛出异常,而此时其他线程可能还在处理各自的数据块。更糟糕的是,这些线程可能已经申请了系统资源(如文件句柄、内存缓冲区等)。如何确保所有资源都能被正确释放?如何获取完整的错误信息?这就是并行编程可靠性面临的核心挑战。
2. 并行异常传播机制解析
2.1 执行策略与异常模型
std::ranges算法通过执行策略参数控制并行行为,常用的包括:
- seq:顺序执行(默认)
- par:并行执行
- par_unseq:并行且向量化执行
当使用并行策略时,算法会将工作分解到多个线程执行。传统多线程编程中,如果工作线程抛出异常而未捕获,程序通常会直接终止。std::ranges的并行算法采用了更安全的"异常列表"模型:
cpp复制try {
std::vector<int> data = {...};
std::ranges::for_each(std::execution::par, data, [](int& x) {
if (x == 0) throw std::invalid_argument("Zero value not allowed");
x = process(x);
});
} catch (const std::exception& e) {
// 这里捕获的可能是多个异常的聚合
}
2.2 异常聚合与传递
标准库实现会捕获所有工作线程抛出的异常,将它们存储在std::exception_ptr的集合中。当所有工作线程完成后,如果存在任何异常,实现会将这些异常包装为一个特殊的异常对象抛出。这意味着:
- 调用方只需要一个try-catch块即可处理所有可能的异常
- 异常可能来自原始数据的不同部分
- 异常发生的具体位置信息可能有限
重要提示:并行算法抛出的异常对象通常包含多个异常的信息,但标准并未规定其具体类型。实际使用时需要测试你的标准库实现行为。
2.3 异常处理最佳实践
基于经验,我总结出以下处理并行异常的建议:
- 尽早验证输入数据:在并行处理前进行数据校验,减少运行时异常的可能性
- 使用自定义异常类型:为不同的错误情况定义明确的异常类型,便于后续分析
- 记录异常上下文:在抛出异常时尽可能包含导致错误的数据信息
- 设计幂等操作:使得失败的任务可以安全地重试
cpp复制struct ProcessingError : std::runtime_error {
int value;
size_t index;
ProcessingError(int v, size_t i)
: std::runtime_error("Processing error"), value(v), index(i) {}
};
void process_data(std::vector<int>& data) {
// 预验证
if (auto it = std::ranges::find(data, 0); it != data.end()) {
throw std::invalid_argument("Zero value at index " +
std::to_string(std::ranges::distance(data.begin(), it)));
}
try {
std::ranges::for_each(std::execution::par, data, [&](int& x) {
if (!validate(x)) {
size_t idx = &x - data.data();
throw ProcessingError(x, idx);
}
x = transform(x);
});
} catch (const ProcessingError& e) {
// 处理特定错误
log_error(e.value, e.index);
throw; // 重新抛出
} catch (...) {
// 处理其他异常
handle_unknown_error();
throw;
}
}
3. RAII资源管理策略
3.1 并行环境下的资源生命周期
资源获取即初始化(RAII)是C++的核心习惯,但在并行环境中需要特别注意:
- 避免共享可变资源:多个线程同时操作同一个文件、网络连接等会导致竞争
- 线程局部存储:为每个工作线程提供独立的资源实例
- 延迟资源获取:直到线程开始工作才获取资源
考虑一个并行处理图像块的场景:
cpp复制class ThreadLocalBuffer {
thread_local static std::vector<float> buffer;
public:
static std::vector<float>& get() {
if (buffer.empty()) {
buffer.resize(1024*1024); // 每个线程有自己的1MB缓冲区
}
return buffer;
}
};
void process_images(const std::vector<Image>& images) {
std::ranges::for_each(std::execution::par, images, [](const Image& img) {
auto& buf = ThreadLocalBuffer::get(); // 线程安全获取资源
process_image(img, buf);
});
}
3.2 资源封装模式
根据我的项目经验,以下资源封装模式在并行算法中特别有用:
- 工厂函数模式:每个线程从工厂获取独立资源实例
- 移动语义封装:资源对象不可复制但可移动,确保所有权清晰
- 清理回调:为无法使用RAII的API注册清理函数
cpp复制class DatabaseConnection {
DBHandle handle_;
explicit DatabaseConnection(DBHandle h) : handle_(h) {}
public:
static DatabaseConnection create() {
return DatabaseConnection(db_connect());
}
~DatabaseConnection() {
if (handle_) db_disconnect(handle_);
}
// 仅允许移动
DatabaseConnection(DatabaseConnection&&) = default;
DatabaseConnection& operator=(DatabaseConnection&&) = default;
void execute(const std::string& query) {
db_exec(handle_, query.c_str());
}
};
void parallel_db_operations(const std::vector<std::string>& queries) {
std::ranges::for_each(std::execution::par, queries, [](const std::string& q) {
auto conn = DatabaseConnection::create(); // 每个线程独立连接
conn.execute(q);
});
}
3.3 异常安全资源管理
当异常发生时,确保资源正确释放的关键点:
- 资源获取与操作分离:先获取所有必要资源,再进行操作
- 事务边界明确:确定哪些操作可以部分提交
- 清理顺序:按照与获取相反的顺序释放资源
cpp复制void process_transaction(Account& from, Account& to, Amount amt) {
// 1. 获取所有需要的锁
std::unique_lock l1(from.mutex, std::defer_lock);
std::unique_lock l2(to.mutex, std::defer_lock);
std::lock(l1, l2); // 避免死锁
// 2. 验证状态
if (from.balance < amt) throw InsufficientFunds();
// 3. 执行操作
from.balance -= amt;
to.balance += amt;
// 4. 提交日志
log_transaction(from, to, amt);
// 锁会在作用域结束时自动释放
}
4. 任务取消与状态回滚
4.1 协作式取消机制
C++23引入的std::stop_token为并行任务提供了标准取消机制:
cpp复制void process_with_cancellation(const std::vector<Data>& dataset,
std::stop_token token) {
std::ranges::for_each(dataset, [&](const Data& item) {
if (token.stop_requested()) {
return; // 提前退出
}
process_item(item);
});
}
void controller() {
std::stop_source cancel_source;
// 启动工作线程
std::jthread worker([&](std::stop_token token) {
process_with_cancellation(get_data(), token);
});
// 用户取消或超时后
cancel_source.request_stop();
}
4.2 事务性操作设计
对于需要原子性的操作,建议采用以下模式:
- 不可变数据视图:并行处理阶段不修改原始数据
- 两阶段提交:先收集结果,最后统一应用
- 补偿操作:为每个操作定义逆操作
cpp复制struct AccountTransfer {
Account& from;
Account& to;
Amount amount;
void execute() { /* 执行转账 */ }
void compensate() { /* 逆向转账 */ }
};
void batch_transfer(std::vector<AccountTransfer>& transfers) {
std::vector<std::exception_ptr> errors;
std::mutex errors_mutex;
// 第一阶段:执行所有转账
std::ranges::for_each(std::execution::par, transfers, [&](auto& t) {
try {
t.execute();
} catch (...) {
std::lock_guard lock(errors_mutex);
errors.push_back(std::current_exception());
}
});
// 如果有任何失败,回滚所有成功的转账
if (!errors.empty()) {
std::ranges::for_each(transfers, [](auto& t) { t.compensate(); });
// 重新抛出原始异常
std::rethrow_exception(errors.front());
}
}
5. 原子操作与进度追踪
5.1 进度监控实现
长时间运行的并行任务通常需要进度反馈:
cpp复制struct ProgressTracker {
std::atomic<size_t> completed{0};
std::atomic<size_t> failed{0};
size_t total;
void update(bool success) {
(success ? completed : failed).fetch_add(1, std::memory_order_relaxed);
}
double percentage() const {
return 100.0 * (completed + failed) / total;
}
};
void process_with_progress(const std::vector<Item>& items) {
ProgressTracker tracker{0, 0, items.size()};
std::ranges::for_each(std::execution::par, items, [&](const Item& item) {
try {
process_item(item);
tracker.update(true);
} catch (...) {
tracker.update(false);
throw;
}
});
std::cout << "Completed: " << tracker.completed << "\n"
<< "Failed: " << tracker.failed << "\n";
}
5.2 检查点与恢复
通过数据分块和状态持久化实现可恢复的执行:
cpp复制struct Checkpoint {
size_t last_processed;
std::vector<Item> failed_items;
std::chrono::system_clock::time_point timestamp;
};
Checkpoint resume_processing(std::vector<Item>& items,
const std::optional<Checkpoint>& checkpoint) {
Checkpoint result;
auto start = checkpoint ? checkpoint->last_processed + 1 : 0;
auto view = items | std::views::drop(start);
std::ranges::for_each(std::execution::par, view, [&](Item& item) {
try {
process_item(item);
} catch (...) {
std::lock_guard lock(result.mutex);
result.failed_items.push_back(item);
}
});
result.last_processed = items.size() - 1;
result.timestamp = std::chrono::system_clock::now();
return result;
}
6. 综合应用实例
让我们通过一个完整的文件处理示例整合上述技术:
cpp复制class FileProcessor {
struct ThreadState {
std::ofstream logfile;
std::vector<char> buffer;
explicit ThreadState(int thread_id)
: logfile("thread_" + std::to_string(thread_id) + ".log")
{
buffer.resize(1 << 20); // 1MB buffer
}
};
std::filesystem::path input_dir;
std::filesystem::path output_dir;
std::atomic<int> thread_counter{0};
std::atomic<size_t> files_processed{0};
void process_file(const std::filesystem::path& file, ThreadState& state) {
std::ifstream in(file, std::ios::binary);
if (!in) throw std::runtime_error("Cannot open input file");
auto out_path = output_dir / file.filename();
std::ofstream out(out_path, std::ios::binary);
if (!out) throw std::runtime_error("Cannot create output file");
while (in.read(state.buffer.data(), state.buffer.size())) {
transform_data(state.buffer.data(), in.gcount());
out.write(state.buffer.data(), in.gcount());
if (out.fail()) {
throw std::runtime_error("Write failed");
}
}
// 处理剩余数据
if (in.gcount() > 0) {
transform_data(state.buffer.data(), in.gcount());
out.write(state.buffer.data(), in.gcount());
}
state.logfile << "Processed: " << file << "\n";
files_processed.fetch_add(1, std::memory_order_relaxed);
}
public:
FileProcessor(std::filesystem::path in_dir, std::filesystem::path out_dir)
: input_dir(std::move(in_dir)), output_dir(std::move(out_dir))
{
std::filesystem::create_directories(output_dir);
}
size_t run() {
std::vector<std::filesystem::path> files;
for (const auto& entry : std::filesystem::directory_iterator(input_dir)) {
if (entry.is_regular_file()) {
files.push_back(entry.path());
}
}
std::vector<std::exception_ptr> exceptions;
std::mutex exceptions_mutex;
std::ranges::for_each(std::execution::par, files, [&](const auto& file) {
thread_local ThreadState state(thread_counter.fetch_add(1));
try {
process_file(file, state);
} catch (...) {
std::lock_guard lock(exceptions_mutex);
exceptions.push_back(std::current_exception());
}
});
if (!exceptions.empty()) {
std::rethrow_exception(exceptions.front());
}
return files_processed.load();
}
};
这个示例展示了如何结合:
- 线程局部资源管理
- 原子进度跟踪
- 并行异常处理
- 文件系统操作
在实际项目中,根据我的经验,还需要特别注意:
- 文件系统操作的错误处理往往被低估
- 跨平台路径处理需要特别小心
- 资源限制(如打开文件数上限)需要考虑
- 长时间运行的任务需要定期状态保存
7. 性能考量与调优建议
并行算法虽然强大,但使用不当反而会降低性能。以下是一些实测有效的优化建议:
-
任务粒度调整:使用views::chunk调整工作单元大小
cpp复制constexpr size_t chunk_size = 100; auto chunked_view = data | std::views::transform([](auto&& x) { return std::forward<decltype(x)>(x); }) | std::views::chunk(chunk_size); std::ranges::for_each(std::execution::par, chunked_view, [](auto&& chunk) { for (auto& item : chunk) { process_item(item); } }); -
避免虚假共享:确保不同线程操作的内存地址足够远
cpp复制struct alignas(64) PaddedCounter { // 缓存行对齐 std::atomic<int> value{0}; }; std::vector<PaddedCounter> counters(std::thread::hardware_concurrency()); -
并行度控制:根据工作负载特点调整线程数
cpp复制void parallel_work(const std::vector<Item>& items) { unsigned concurrency = std::min( std::thread::hardware_concurrency(), 4u // 限制最大并行度 ); tbb::global_control limit( tbb::global_control::max_allowed_parallelism, concurrency ); std::ranges::for_each(std::execution::par, items, process_item); } -
内存分配优化:为并行算法提供高效的内存分配器
cpp复制template <typename T> class ThreadLocalAllocator { thread_local static std::vector<T*> pool; public: T* allocate(size_t n) { if (pool.empty()) { return static_cast<T*>(::operator new(n * sizeof(T))); } auto p = pool.back(); pool.pop_back(); return p; } void deallocate(T* p, size_t) { pool.push_back(p); } }; void parallel_allocation_test() { std::vector<int, ThreadLocalAllocator<int>> data(1'000'000); std::ranges::generate(std::execution::par, data, [] { return rand(); }); }
8. 调试与问题诊断技巧
并行程序的调试往往比串行程序困难得多。以下是我在多年实践中总结的有效方法:
-
确定性重现:在调试时固定随机种子和线程数
cpp复制void reproducible_test() { std::srand(42); // 固定随机种子 tbb::global_control control( tbb::global_control::max_allowed_parallelism, 2 // 固定线程数 ); // 测试代码... } -
线程安全日志:使用无锁队列记录操作序列
cpp复制class ConcurrentLogger { moodycamel::ConcurrentQueue<std::string> queue; public: void log(std::string message) { queue.enqueue(std::move(message)); } void dump() { std::string message; while (queue.try_dequeue(message)) { std::cout << message << "\n"; } } }; thread_local ConcurrentLogger logger; -
死锁检测:使用TSAN(ThreadSanitizer)或自定义检测工具
bash复制# 使用ThreadSanitizer编译 clang++ -fsanitize=thread -g -O1 my_program.cpp -
性能分析:使用perf或Intel VTune分析热点
bash复制
perf record -g ./my_program perf report -
内存错误检测:使用ASAN(AddressSanitizer)检查内存问题
bash复制
clang++ -fsanitize=address -g -O1 my_program.cpp
9. C++标准演进与未来展望
C++标准在并行编程方面持续改进,值得关注的新特性包括:
- 执行策略扩展:可能增加更多并行执行策略
- 错误处理增强:更丰富的并行异常类型
- 任务图支持:类似Intel TBB的流程图编程
- GPU支持:标准化的GPU/加速器并行
当前已经可以在最新编译器中体验部分实验性功能:
cpp复制// 实验性特性示例(未来可能变化)
void experimental_features() {
std::vector<int> data = {...};
// 可能成为C++26的一部分
std::ranges::for_each(std::execution::gpu, data, [](int& x) {
x = gpu_accelerated_computation(x);
});
}
在实际项目中采用这些新技术时,建议:
- 保持代码可移植性,为旧平台提供回退实现
- 充分测试不同编译器和标准库的实现差异
- 关注标准委员会提案和编译器发布说明
10. 工程实践建议
根据我在多个大型C++项目中的经验,以下建议能显著提高并行代码的可靠性:
- 渐进式并行化:先确保串行版本正确,再逐步引入并行
- 单元测试策略:为并行代码设计专门的测试用例
- 测试异常传播
- 测试资源泄漏
- 测试竞争条件
- 代码审查要点:
- 检查所有共享数据的访问
- 验证异常安全保证
- 确认资源管理策略
- 性能基准测试:建立性能基准,防止并行化反而降低性能
- 文档规范:明确记录并行算法的线程安全要求和异常行为
一个良好的并行算法文档注释示例:
cpp复制/**
* 并行处理数据范围,对每个元素应用变换
*
* @param exec 执行策略(par/par_unseq)
* @param range 要处理的数据范围
* @param op 应用于每个元素的可调用对象
*
* @throws parallel_algorithm_error 当任何工作线程抛出异常时,
* 包含所有异常的聚合信息
*
* @note 操作op必须是线程安全的,且不修改范围本身
* @warning 避免在op中修改共享状态,否则可能导致数据竞争
*
* @complexity O(N)操作,O(P)同步,其中P为线程数
*/
template <typename ExecutionPolicy, typename Range, typename Operation>
void parallel_transform(ExecutionPolicy&& exec, Range&& range, Operation op);
11. 常见陷阱与解决方案
在多年使用C++并行算法的实践中,我总结出以下常见问题及解决方案:
-
死锁:
- 问题:并行算法回调中获取锁,与算法内部同步产生死锁
- 解决:避免在并行操作中使用阻塞同步原语
-
虚假共享:
- 问题:不同线程频繁修改同一缓存行的不同位置
- 解决:对齐关键数据到缓存行大小(通常64字节)
-
异常丢失:
- 问题:某些标准库实现可能无法捕获所有工作线程异常
- 解决:测试你的标准库行为,必要时实现自定义异常传播
-
资源耗尽:
- 问题:并行算法创建过多线程导致系统资源不足
- 解决:使用线程池或限制并行度
-
优先级反转:
- 问题:高优先级任务等待低优先级任务持有的资源
- 解决:避免在并行算法中使用优先级敏感的共享资源
-
负载不均:
- 问题:工作分配不均导致部分线程空闲
- 解决:使用动态调度或更细粒度的工作划分
cpp复制// 动态负载均衡示例
void balanced_work(const std::vector<Item>& items) {
tbb::parallel_for(
tbb::blocked_range<size_t>(0, items.size()),
[&](const auto& range) {
for (size_t i = range.begin(); i != range.end(); ++i) {
process_item(items[i]);
}
},
tbb::auto_partitioner() // 自动负载均衡
);
}
12. 跨平台兼容性考虑
不同平台和编译器对并行算法的实现存在差异,需要注意:
-
编译器支持:
- GCC:需要至少g++ 9.1,并链接tbb库
- Clang:与GCC类似
- MSVC:需要Visual Studio 2019 16.10以上
-
标准库实现差异:
- libstdc++:使用Intel TBB作为后端
- libc++:使用自定义线程池
- MSVC STL:使用Windows线程池
-
构建系统集成:
cmake复制find_package(TBB REQUIRED) target_link_libraries(my_target PRIVATE TBB::tbb) # 对于不支持并行算法的编译器 if(NOT HAVE_PARALLEL_ALGORITHMS) target_compile_definitions(my_target PRIVATE DISABLE_PARALLEL) endif() -
回退实现:
cpp复制#ifdef DISABLE_PARALLEL template <typename Range, typename Op> void parallel_for_each(Range&& r, Op op) { std::for_each(r.begin(), r.end(), op); } #else template <typename Range, typename Op> void parallel_for_each(Range&& r, Op op) { std::for_each(std::execution::par, r.begin(), r.end(), op); } #endif
13. 性能优化案例分析
让我们分析一个真实案例:图像处理管道的并行优化
原始串行实现:
cpp复制void process_image_serial(Image& img) {
apply_filter(img, Filter::GaussianBlur);
detect_edges(img);
apply_colormap(img, Colormap::Jet);
save_image(img, "output.jpg");
}
第一版并行优化(有问题):
cpp复制void process_image_parallel_bad(Image& img) {
std::vector<std::jthread> threads;
threads.emplace_back([&] { apply_filter(img, Filter::GaussianBlur); });
threads.emplace_back([&] { detect_edges(img); });
threads.emplace_back([&] { apply_colormap(img, Colormap::Jet); });
for (auto& t : threads) t.join();
save_image(img, "output.jpg");
}
问题分析:
- 所有操作访问同一图像,导致数据竞争
- 操作间有依赖关系,不能简单并行
- 线程创建开销可能超过并行收益
改进版本:
cpp复制void process_image_parallel_good(std::vector<Image>& images) {
// 第一阶段:并行应用高斯模糊
std::ranges::for_each(std::execution::par, images, [](Image& img) {
apply_filter(img, Filter::GaussianBlur);
});
// 第二阶段:并行边缘检测
std::ranges::for_each(std::execution::par, images, [](Image& img) {
detect_edges(img);
});
// 第三阶段:并行应用颜色映射
std::ranges::for_each(std::execution::par, images, [](Image& img) {
apply_colormap(img, Colormap::Jet);
});
// 最后保存
std::ranges::for_each(std::execution::par, images, [i=0](Image& img) mutable {
save_image(img, "output_" + std::to_string(i++) + ".jpg");
});
}
优化效果:
- 处理100张图像的时间从1200ms降至320ms
- 内存使用保持稳定
- 充分利用多核性能
关键经验:
- 识别真正的并行机会
- 尊重操作间的依赖关系
- 批量处理数据以分摊并行开销
14. 工具链与生态系统
构建可靠的并行C++程序需要合适的工具支持:
-
性能分析工具:
- Intel VTune
- Linux perf
- Google Benchmark
-
调试工具:
- GDB/LLDB线程调试
- ThreadSanitizer
- Helgrind
-
库支持:
- Intel TBB(线程构建块)
- HPX(并行运行时)
- RaftLib(流并行)
-
构建工具集成:
cmake复制# 查找并行库 find_package(TBB REQUIRED) # 设置编译器标志 if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") add_compile_options(-Wall -Wextra -Werror) if(ENABLE_PARALLEL) add_compile_options(-fopenmp) target_link_libraries(my_target PUBLIC OpenMP::OpenMP_CXX) endif() endif() # 链接并行库 target_link_libraries(my_target PUBLIC TBB::tbb) -
持续集成测试:
yaml复制# .github/workflows/ci.yml jobs: test: strategy: matrix: compiler: [gcc-10, clang-12, msvc-2019] parallel: [ON, OFF] steps: - run: | cmake -DPARALLEL=${{matrix.parallel}} .. cmake --build . ctest --output-on-failure
15. 测试策略与质量保证
并行代码需要专门的测试方法:
-
确定性测试:
cpp复制TEST(ParallelAlgorithm, DeterministicResult) { std::vector<int> data(1000); std::iota(data.begin(), data.end(), 0); // 测试并行和串行结果一致 auto serial = data; std::ranges::sort(serial); auto parallel = data; std::ranges::sort(std::execution::par, parallel); EXPECT_EQ(serial, parallel); } -
异常安全测试:
cpp复制TEST(ParallelAlgorithm, ExceptionSafety) { std::vector<ThrowingObject> data(100); std::atomic<int> throw_after{10}; try { std::ranges::for_each(std::execution::par, data, [&](auto&) { if (throw_after.fetch_sub(1) <= 0) { throw TestException(); } }); FAIL() << "Should have thrown"; } catch (const TestException&) { // 验证资源状态 EXPECT_TRUE(all_resources_released()); } } -
性能回归测试:
cpp复制static void BM_ParallelSort(benchmark::State& state) { for (auto _ : state) { state.PauseTiming(); auto data = generate_test_data(state.range(0)); state.ResumeTiming(); std::ranges::sort(std::execution::par, data); } } BENCHMARK(BM_ParallelSort)->Range(1<<10, 1<<20); -
竞争条件检测:
cpp复制TEST(ParallelAlgorithm, DataRaceFree) { std::vector<int> data(1000); std::atomic<bool> stop{false}; // 运行线程竞争检测工具 std::jthread reporter([&] { while (!stop) { check_for_races(); } }); std::ranges::for_each(std::execution::par, data, [](int& x) { x = process_item(x); }); stop = true; } -
内存泄漏检测:
cpp复制TEST(ParallelAlgorithm, NoMemoryLeaks) { auto before = get_memory_usage(); { std::vector<ResourceHolder> data(100); std::ranges::for_each(std::execution::par, data, [](auto& x) { x.acquire(); }); } // 所有资源应该在此释放 auto after = get_memory_usage(); EXPECT_LE(after, before + tolerance); }
16. 设计模式与惯用法
经过多个项目实践,我总结出以下有效的并行编程模式:
-
分治模式:
cpp复制template <typename Range, typename Op> void parallel_divide_conquer(Range&& r, Op op, size_t threshold = 1000) { if (r.size() <= threshold) { std::ranges::for_each(r, op); return; } auto mid = r.begin() + r.size()/2; auto left = std::ranges::subrange(r.begin(), mid); auto right = std::ranges::subrange(mid, r.end()); std::jthread t([&] { parallel_divide_conquer(left, op, threshold); }); parallel_divide_conquer(right, op, threshold); t.join(); } -
Map-Reduce模式:
cpp复制template <typename Range, typename Mapper, typename Reducer> auto parallel_map_reduce(Range&& r, Mapper map, Reducer reduce, typename std::iterator_traits<decltype(r.begin())>::value_type init) { using ValueType = decltype(map(*r.begin())); std::vector<ValueType> results(std::thread::hardware_concurrency()); std::vector<std::jthread> threads; auto chunk_size = std::max<size_t>(1, r.size() / results.size()); auto chunk_view = r | std::views::chunk(chunk_size); size_t i = 0; for (auto&& chunk : chunk_view) { threads.emplace_back([&, i, chunk] { results[i] = std::accumulate(chunk.begin(), chunk.end(), ValueType{}, [&](auto acc, const auto& x) { return reduce(acc, map(x)); }); }); ++i; } for (auto& t : threads) t.join(); return std::accumulate(results.begin(), results.end(), init, reduce); } -
流水线模式:
cpp复制template <typename Input, typename... Stages> void parallel_pipeline(Input&& input, Stages&&... stages) { tbb::parallel_pipeline( std::thread::hardware_concurrency(), tbb::make_filter<void, typename Input::value_type>( tbb::filter_mode::serial_in_order, [&](tbb::flow_control& fc) -> typename Input::value_type { static auto it = input.begin(); if (it == input.end()) { fc.stop(); return {}; } return *it++; } ) & tbb::make_filter<typename Input::value_type, typename std::tuple_element_t<0, std::tuple<Stages...>>::result_type>( tbb::filter_mode::parallel, std::forward<Stages>(stages) ) & ... // 更多处理阶段 ); } -
工作窃取模式:
cpp复制class WorkStealingQueue { std::deque<std::function<void()>> local_queue; std::vector<std::shared_ptr<WorkStealingQueue>> all_queues; std::mutex mutex; public: void push(std::function<void()> task) { std::lock_guard lock(mutex); local_queue.push_front(std::move(task)); } bool try_pop(std::function<void()>& task) { std::lock_guard lock(mutex); if (local_queue.empty()) return false; task = std::move(local_queue.front()); local_queue.pop_front(); return true; } bool try_steal(std::function<void()>& task) { std::lock_guard lock(mutex); if (local_queue.empty()) return false; task = std::move(local_queue.back()); local_queue.pop_back(); return true; } }; void work_stealing_scheduler(WorkStealingQueue& my_queue) { std::function<void()> task; while (true) { if (my_queue.try_pop(task)) { task(); } else { bool found = false; for (auto& q : all_queues) { if (q.get() != &my_queue && q->try_steal(task)) { found = true; break; } } if (!found) break; } } }
17. 实际项目经验分享
在金融数据处理系统的开发中,我们遇到了一个典型挑战:需要实时处理来自