1. 项目概述
数据库连接池是后端开发中一个经典的基础组件,特别是在高并发场景下尤为重要。每次请求都新建和销毁数据库连接会产生巨大的性能开销,而连接池通过预先建立并维护一定数量的数据库连接,实现了连接的复用,从而显著提升系统性能。
在C++项目中实现MySQL连接池,需要考虑线程安全、连接管理、超时处理等多个关键点。本文将基于C++11标准,从零开始构建一个生产可用的MySQL连接池,并附上完整核心代码实现。
2. 核心设计思路
2.1 连接池的基本原理
连接池的核心思想是"空间换时间"。启动时预先建立N个数据库连接放入池中,当应用需要连接时从池中获取,使用完毕后归还而不是直接关闭。这种方式避免了频繁创建和销毁连接的开销。
一个健壮的连接池需要具备以下能力:
- 连接的生命周期管理
- 连接的健康检查
- 线程安全的连接获取和释放
- 连接数动态调整
- 超时和异常处理
2.2 关键技术选型
我们选择以下技术栈实现连接池:
- C++11标准:使用现代C++特性如智能指针、线程库等
- MySQL C Connector:官方C接口,兼容性好
- RAII模式:确保资源自动释放
- 双检锁:实现线程安全的懒加载单例
3. 核心代码实现
3.1 连接池类定义
cpp复制class ConnectionPool {
public:
// 获取连接池单例
static ConnectionPool* getInstance();
// 初始化连接池
bool init(const std::string& host,
const std::string& user,
const std::string& pwd,
const std::string& dbName,
int port = 3306,
int maxConn = 8);
// 获取一个连接
std::shared_ptr<MYSQL> getConnection();
// 释放连接(实际是归还到池中)
void releaseConnection(MYSQL* conn);
private:
ConnectionPool() = default;
~ConnectionPool();
// 创建新连接
MYSQL* createNewConnection();
// 销毁连接
void destroyConnection(MYSQL* conn);
// 连接池扩容
void expandPool(int size);
// 连接池缩容
void reducePool(int size);
// 定时器检查连接状态
void checkConnection();
std::string host_;
std::string user_;
std::string pwd_;
std::string dbName_;
int port_;
int maxConn_;
std::list<MYSQL*> connList_; // 空闲连接
std::list<MYSQL*> usedList_; // 使用中的连接
std::mutex mutex_;
std::condition_variable cond_;
static ConnectionPool* instance_;
static std::mutex instanceMutex_;
};
3.2 连接池初始化实现
cpp复制bool ConnectionPool::init(const std::string& host,
const std::string& user,
const std::string& pwd,
const std::string& dbName,
int port,
int maxConn) {
host_ = host;
user_ = user;
pwd_ = pwd;
dbName_ = dbName;
port_ = port;
maxConn_ = maxConn;
// 初始创建一半的连接
for (int i = 0; i < maxConn_ / 2; ++i) {
MYSQL* conn = createNewConnection();
if (conn) {
connList_.push_back(conn);
} else {
return false;
}
}
// 启动检查线程
std::thread(&ConnectionPool::checkConnection, this).detach();
return true;
}
3.3 获取连接实现
cpp复制std::shared_ptr<MYSQL> ConnectionPool::getConnection() {
std::unique_lock<std::mutex> lock(mutex_);
// 等待可用连接
while (connList_.empty()) {
if (usedList_.size() < maxConn_) {
// 可以创建新连接
MYSQL* newConn = createNewConnection();
if (newConn) {
usedList_.push_back(newConn);
return std::shared_ptr<MYSQL>(newConn,
[this](MYSQL* conn) { releaseConnection(conn); });
}
}
// 等待连接释放
if (cond_.wait_for(lock, std::chrono::milliseconds(500)) ==
std::cv_status::timeout) {
// 超时返回空指针
return nullptr;
}
}
// 获取空闲连接
MYSQL* conn = connList_.front();
connList_.pop_front();
usedList_.push_back(conn);
return std::shared_ptr<MYSQL>(conn,
[this](MYSQL* conn) { releaseConnection(conn); });
}
4. 关键实现细节
4.1 连接的健康检查
连接池需要定期检查连接的健康状态,避免使用已经失效的连接:
cpp复制void ConnectionPool::checkConnection() {
while (true) {
std::this_thread::sleep_for(std::chrono::minutes(5));
std::lock_guard<std::mutex> lock(mutex_);
// 检查空闲连接
auto it = connList_.begin();
while (it != connList_.end()) {
if (mysql_ping(*it) != 0) {
mysql_close(*it);
it = connList_.erase(it);
} else {
++it;
}
}
// 检查使用中的连接
it = usedList_.begin();
while (it != usedList_.end()) {
if (mysql_ping(*it) != 0) {
mysql_close(*it);
it = usedList_.erase(it);
} else {
++it;
}
}
// 补充连接至初始数量
while (connList_.size() + usedList_.size() < maxConn_ / 2) {
MYSQL* conn = createNewConnection();
if (conn) {
connList_.push_back(conn);
} else {
break;
}
}
}
}
4.2 连接创建与销毁
cpp复制MYSQL* ConnectionPool::createNewConnection() {
MYSQL* conn = mysql_init(nullptr);
if (!conn) {
return nullptr;
}
if (!mysql_real_connect(conn, host_.c_str(), user_.c_str(),
pwd_.c_str(), dbName_.c_str(),
port_, nullptr, 0)) {
mysql_close(conn);
return nullptr;
}
// 设置自动重连
my_bool reconnect = 1;
mysql_options(conn, MYSQL_OPT_RECONNECT, &reconnect);
return conn;
}
void ConnectionPool::destroyConnection(MYSQL* conn) {
if (conn) {
mysql_close(conn);
}
}
5. 使用示例
5.1 初始化连接池
cpp复制ConnectionPool* pool = ConnectionPool::getInstance();
if (!pool->init("127.0.0.1", "root", "password", "testdb")) {
std::cerr << "Failed to init connection pool" << std::endl;
return -1;
}
5.2 执行查询
cpp复制auto conn = pool->getConnection();
if (!conn) {
std::cerr << "Failed to get connection" << std::endl;
return;
}
if (mysql_query(conn.get(), "SELECT * FROM users")) {
std::cerr << "Query failed: " << mysql_error(conn.get()) << std::endl;
return;
}
MYSQL_RES* result = mysql_store_result(conn.get());
if (!result) {
std::cerr << "Store result failed: " << mysql_error(conn.get()) << std::endl;
return;
}
// 处理结果集...
mysql_free_result(result);
6. 性能优化技巧
6.1 连接数动态调整
可以根据系统负载动态调整连接池大小:
cpp复制void ConnectionPool::adjustPoolSize(int newSize) {
std::lock_guard<std::mutex> lock(mutex_);
if (newSize > maxConn_) {
// 扩容
expandPool(newSize - maxConn_);
} else if (newSize < maxConn_) {
// 缩容
reducePool(maxConn_ - newSize);
}
maxConn_ = newSize;
}
6.2 连接预热
在系统启动时预先建立所有连接,避免首次请求延迟:
cpp复制void ConnectionPool::warmUp() {
std::lock_guard<std::mutex> lock(mutex_);
while (connList_.size() + usedList_.size() < maxConn_) {
MYSQL* conn = createNewConnection();
if (conn) {
connList_.push_back(conn);
} else {
break;
}
}
}
7. 常见问题与解决方案
7.1 连接泄漏
症状:连接池中的连接逐渐减少,最终耗尽。
解决方案:
- 使用智能指针管理连接,确保连接最终会被归还
- 实现连接的最大生命周期,超时强制回收
- 添加连接泄漏检测日志
7.2 连接失效
症状:从池中获取的连接执行查询失败。
解决方案:
- 实现连接的健康检查机制
- 获取连接时进行ping测试
- 设置MySQL自动重连选项
7.3 性能瓶颈
症状:高并发时获取连接等待时间过长。
解决方案:
- 优化连接池参数(初始连接数、最大连接数)
- 实现连接的分级管理(如读写分离)
- 考虑使用更高效的并发控制机制
8. 完整代码实现
以下是连接池的完整实现代码:
cpp复制// ConnectionPool.h
#pragma once
#include <mysql/mysql.h>
#include <memory>
#include <string>
#include <list>
#include <mutex>
#include <condition_variable>
class ConnectionPool {
public:
static ConnectionPool* getInstance();
bool init(const std::string& host,
const std::string& user,
const std::string& pwd,
const std::string& dbName,
int port = 3306,
int maxConn = 8);
std::shared_ptr<MYSQL> getConnection();
void releaseConnection(MYSQL* conn);
void adjustPoolSize(int newSize);
void warmUp();
private:
ConnectionPool() = default;
~ConnectionPool();
MYSQL* createNewConnection();
void destroyConnection(MYSQL* conn);
void expandPool(int size);
void reducePool(int size);
void checkConnection();
std::string host_;
std::string user_;
std::string pwd_;
std::string dbName_;
int port_;
int maxConn_;
std::list<MYSQL*> connList_;
std::list<MYSQL*> usedList_;
std::mutex mutex_;
std::condition_variable cond_;
static ConnectionPool* instance_;
static std::mutex instanceMutex_;
};
// ConnectionPool.cpp
#include "ConnectionPool.h"
#include <thread>
#include <chrono>
ConnectionPool* ConnectionPool::instance_ = nullptr;
std::mutex ConnectionPool::instanceMutex_;
ConnectionPool* ConnectionPool::getInstance() {
if (instance_ == nullptr) {
std::lock_guard<std::mutex> lock(instanceMutex_);
if (instance_ == nullptr) {
instance_ = new ConnectionPool();
}
}
return instance_;
}
bool ConnectionPool::init(const std::string& host,
const std::string& user,
const std::string& pwd,
const std::string& dbName,
int port,
int maxConn) {
host_ = host;
user_ = user;
pwd_ = pwd;
dbName_ = dbName;
port_ = port;
maxConn_ = maxConn;
for (int i = 0; i < maxConn_ / 2; ++i) {
MYSQL* conn = createNewConnection();
if (conn) {
connList_.push_back(conn);
} else {
return false;
}
}
std::thread(&ConnectionPool::checkConnection, this).detach();
return true;
}
std::shared_ptr<MYSQL> ConnectionPool::getConnection() {
std::unique_lock<std::mutex> lock(mutex_);
while (connList_.empty()) {
if (usedList_.size() < maxConn_) {
MYSQL* newConn = createNewConnection();
if (newConn) {
usedList_.push_back(newConn);
return std::shared_ptr<MYSQL>(newConn,
[this](MYSQL* conn) { releaseConnection(conn); });
}
}
if (cond_.wait_for(lock, std::chrono::milliseconds(500)) ==
std::cv_status::timeout) {
return nullptr;
}
}
MYSQL* conn = connList_.front();
connList_.pop_front();
usedList_.push_back(conn);
return std::shared_ptr<MYSQL>(conn,
[this](MYSQL* conn) { releaseConnection(conn); });
}
void ConnectionPool::releaseConnection(MYSQL* conn) {
if (!conn) return;
std::lock_guard<std::mutex> lock(mutex_);
auto it = std::find(usedList_.begin(), usedList_.end(), conn);
if (it != usedList_.end()) {
usedList_.erase(it);
connList_.push_back(conn);
cond_.notify_one();
} else {
destroyConnection(conn);
}
}
void ConnectionPool::adjustPoolSize(int newSize) {
std::lock_guard<std::mutex> lock(mutex_);
if (newSize > maxConn_) {
expandPool(newSize - maxConn_);
} else if (newSize < maxConn_) {
reducePool(maxConn_ - newSize);
}
maxConn_ = newSize;
}
void ConnectionPool::warmUp() {
std::lock_guard<std::mutex> lock(mutex_);
while (connList_.size() + usedList_.size() < maxConn_) {
MYSQL* conn = createNewConnection();
if (conn) {
connList_.push_back(conn);
} else {
break;
}
}
}
MYSQL* ConnectionPool::createNewConnection() {
MYSQL* conn = mysql_init(nullptr);
if (!conn) {
return nullptr;
}
if (!mysql_real_connect(conn, host_.c_str(), user_.c_str(),
pwd_.c_str(), dbName_.c_str(),
port_, nullptr, 0)) {
mysql_close(conn);
return nullptr;
}
my_bool reconnect = 1;
mysql_options(conn, MYSQL_OPT_RECONNECT, &reconnect);
return conn;
}
void ConnectionPool::destroyConnection(MYSQL* conn) {
if (conn) {
mysql_close(conn);
}
}
void ConnectionPool::expandPool(int size) {
for (int i = 0; i < size; ++i) {
MYSQL* conn = createNewConnection();
if (conn) {
connList_.push_back(conn);
} else {
break;
}
}
}
void ConnectionPool::reducePool(int size) {
for (int i = 0; i < size && !connList_.empty(); ++i) {
MYSQL* conn = connList_.front();
connList_.pop_front();
destroyConnection(conn);
}
}
void ConnectionPool::checkConnection() {
while (true) {
std::this_thread::sleep_for(std::chrono::minutes(5));
std::lock_guard<std::mutex> lock(mutex_);
auto it = connList_.begin();
while (it != connList_.end()) {
if (mysql_ping(*it) != 0) {
mysql_close(*it);
it = connList_.erase(it);
} else {
++it;
}
}
it = usedList_.begin();
while (it != usedList_.end()) {
if (mysql_ping(*it) != 0) {
mysql_close(*it);
it = usedList_.erase(it);
} else {
++it;
}
}
while (connList_.size() + usedList_.size() < maxConn_ / 2) {
MYSQL* conn = createNewConnection();
if (conn) {
connList_.push_back(conn);
} else {
break;
}
}
}
}
ConnectionPool::~ConnectionPool() {
for (auto conn : connList_) {
destroyConnection(conn);
}
for (auto conn : usedList_) {
destroyConnection(conn);
}
}
9. 高级功能扩展
9.1 连接分片
对于大型系统,可以考虑实现连接分片,将连接按业务类型分组管理:
cpp复制class ShardedConnectionPool {
public:
ShardedConnectionPool(int shards = 4) : shards_(shards) {
pools_.resize(shards_);
for (auto& pool : pools_) {
pool = std::make_unique<ConnectionPool>();
}
}
std::shared_ptr<MYSQL> getConnection(const std::string& key) {
size_t shard = std::hash<std::string>{}(key) % shards_;
return pools_[shard]->getConnection();
}
private:
int shards_;
std::vector<std::unique_ptr<ConnectionPool>> pools_;
};
9.2 读写分离
扩展连接池支持读写分离:
cpp复制class RWConnectionPool {
public:
bool init(const std::string& rwConfig...) {
// 初始化读写连接池
}
std::shared_ptr<MYSQL> getReadConnection() {
return readPool_->getConnection();
}
std::shared_ptr<MYSQL> getWriteConnection() {
return writePool_->getConnection();
}
private:
std::unique_ptr<ConnectionPool> readPool_;
std::unique_ptr<ConnectionPool> writePool_;
};
9.3 连接池监控
添加监控接口,实时获取连接池状态:
cpp复制struct PoolStatus {
size_t totalConnections;
size_t idleConnections;
size_t activeConnections;
size_t waitingRequests;
};
class MonitorableConnectionPool : public ConnectionPool {
public:
PoolStatus getStatus() const {
std::lock_guard<std::mutex> lock(mutex_);
return {
connList_.size() + usedList_.size(),
connList_.size(),
usedList_.size(),
waitingRequests_
};
}
private:
std::atomic<size_t> waitingRequests_{0};
};
10. 性能测试与调优
10.1 基准测试
使用以下方法测试连接池性能:
cpp复制void benchmark(ConnectionPool* pool, int threads, int iterations) {
std::vector<std::thread> workers;
std::atomic<int> success{0};
std::atomic<int> failed{0};
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < threads; ++i) {
workers.emplace_back([&]() {
for (int j = 0; j < iterations; ++j) {
auto conn = pool->getConnection();
if (conn) {
if (mysql_query(conn.get(), "SELECT 1") == 0) {
++success;
} else {
++failed;
}
} else {
++failed;
}
}
});
}
for (auto& t : workers) {
t.join();
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Threads: " << threads
<< ", Iterations: " << iterations
<< ", Success: " << success
<< ", Failed: " << failed
<< ", Time: " << duration.count() << "ms"
<< ", QPS: " << (success * 1000.0 / duration.count()) << std::endl;
}
10.2 调优建议
根据测试结果调整以下参数:
- 初始连接数:建议设置为最大连接数的50%
- 最大连接数:根据系统资源和并发量设置
- 获取连接超时时间:根据业务容忍度设置
- 连接检查间隔:根据网络稳定性设置
11. 生产环境注意事项
- 连接泄漏检测:定期检查连接池状态,确保没有连接泄漏
- 监控告警:监控连接池的关键指标(活跃连接数、等待数等)
- 优雅关闭:在程序退出时正确关闭连接池
- 配置分离:将连接池参数配置化,便于动态调整
- 日志记录:记录关键操作和异常情况
12. 替代方案比较
除了自研连接池,还可以考虑以下方案:
- ORM内置连接池:如使用ODB、SOCI等ORM框架自带的连接池
- 第三方连接池库:如libzdb、cppconn等
- 代理中间件:如使用ProxySQL管理连接
自研连接池的优势在于:
- 完全可控,可深度定制
- 无额外依赖
- 性能优化空间大
13. 完整项目结构
建议的项目目录结构:
code复制mysql-connection-pool/
├── include/
│ └── ConnectionPool.h
├── src/
│ └── ConnectionPool.cpp
├── test/
│ ├── benchmark.cpp
│ └── unit_test.cpp
├── CMakeLists.txt
└── README.md
CMake配置示例:
cmake复制cmake_minimum_required(VERSION 3.10)
project(mysql_connection_pool)
set(CMAKE_CXX_STANDARD 11)
find_package(MySQL REQUIRED)
add_library(connection_pool
src/ConnectionPool.cpp
include/ConnectionPool.h)
target_include_directories(connection_pool
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${MYSQL_INCLUDE_DIR})
target_link_libraries(connection_pool
PRIVATE ${MYSQL_LIBRARY}
Threads::Threads)
add_executable(benchmark test/benchmark.cpp)
target_link_libraries(benchmark connection_pool)
14. 编译与集成
编译命令:
bash复制mkdir build && cd build
cmake ..
make
集成到现有项目:
- 将ConnectionPool.h和ConnectionPool.cpp添加到项目
- 链接MySQL客户端库
- 在代码中初始化并使用连接池
15. 单元测试示例
使用Google Test框架编写单元测试:
cpp复制#include "ConnectionPool.h"
#include <gtest/gtest.h>
class ConnectionPoolTest : public ::testing::Test {
protected:
void SetUp() override {
pool_ = ConnectionPool::getInstance();
ASSERT_TRUE(pool_->init("127.0.0.1", "root", "password", "testdb"));
}
ConnectionPool* pool_;
};
TEST_F(ConnectionPoolTest, BasicOperation) {
auto conn = pool_->getConnection();
ASSERT_NE(conn, nullptr);
EXPECT_EQ(mysql_query(conn.get(), "SELECT 1"), 0);
}
TEST_F(ConnectionPoolTest, ConcurrentAccess) {
const int kThreads = 10;
std::vector<std::thread> threads;
std::atomic<int> success{0};
for (int i = 0; i < kThreads; ++i) {
threads.emplace_back([&]() {
auto conn = pool_->getConnection();
if (conn && mysql_query(conn.get(), "SELECT 1") == 0) {
++success;
}
});
}
for (auto& t : threads) {
t.join();
}
EXPECT_EQ(success, kThreads);
}