在分布式系统开发中,远程过程调用(RPC)是实现服务间通信的核心技术。最近我用C++实现了一个轻量级的JSON-RPC框架,基于muduo网络库和JsonCpp,支持同步/异步调用、服务注册发现和发布订阅功能。这个项目最让我自豪的是其简洁的设计和不到3000行的核心代码量,却能实现完整的RPC功能。
这个框架特别适合中小规模的分布式系统,比如物联网设备管理、微服务架构中的内部通信等场景。与gRPC等工业级方案相比,它的优势在于:
在技术选型阶段,我对比了三种主流方案:
最终选择JSON主要基于以下考虑:
实测表明,在参数小于1KB的情况下,JSON与Protobuf的序列化性能差异在20%以内,而开发效率提升显著。
muduo是陈硕开发的一个优秀的C++网络库,其核心优势在于:
特别值得一提的是它的"one loop per thread"模型,完美匹配我们的需求。下面是一个典型的事件循环示例:
cpp复制EventLoop loop;
TcpServer server(&loop, InetAddress(8080), "RpcServer");
server.setConnectionCallback([](const TcpConnectionPtr& conn) {
if (conn->connected()) {
LOG_INFO << "New connection: " << conn->peerAddress();
}
});
server.start();
loop.loop();
框架采用经典的三层架构:
| 层级 | 组件 | 职责 |
|---|---|---|
| 抽象层 | Protocol、Transport | 定义接口规范 |
| 具象层 | JsonProtocol、MuduoTransport | 具体实现 |
| 业务层 | RpcServer、Registry | 业务逻辑 |
这种设计使得网络库和协议格式可以灵活替换。比如要改用Boost.Asio,只需实现Transport接口即可。
我们采用LV(Length-Value)格式解决TCP粘包问题:
code复制+----------+----------+------------+
| 4字节长度 | 4字节类型 | N字节消息体 |
+----------+----------+------------+
关键实现代码:
cpp复制void onMessage(const TcpConnectionPtr& conn, Buffer* buf) {
while (buf->readableBytes() >= kHeaderLen) {
int32_t len = buf->peekInt32();
if (buf->readableBytes() < len + kHeaderLen) break;
buf->retrieve(kHeaderLen);
string message(buf->peek(), len);
buf->retrieve(len);
handleMessage(conn, message);
}
}
采用策略模式实现多态处理:
cpp复制class Dispatcher {
public:
using Handler = std::function<void(const MessagePtr&)>;
void registerHandler(int type, Handler handler) {
handlers_[type] = handler;
}
void dispatch(const MessagePtr& msg) {
auto it = handlers_.find(msg->type());
if (it != handlers_.end()) {
it->second(msg);
}
}
private:
std::unordered_map<int, Handler> handlers_;
};
实现方法调用映射:
cpp复制class Router {
public:
template <typename Func>
void registerMethod(const string& name, Func func) {
methods_[name] = [func](const Json::Value& params) {
return detail::invoke(func, params);
};
}
Json::Value call(const string& method, const Json::Value& params) {
auto it = methods_.find(method);
if (it == methods_.end()) {
throw RpcException("Method not found");
}
return it->second(params);
}
private:
std::unordered_map<string, std::function<Json::Value(Json::Value)>> methods_;
};
利用C++11的future/promise实现异步调用:
cpp复制class AsyncCall {
public:
AsyncCall() : promise_(std::make_shared<std::promise<Json::Value>>()) {}
std::future<Json::Value> getFuture() { return promise_->get_future(); }
void setResult(Json::Value result) { promise_->set_value(result); }
private:
std::shared_ptr<std::promise<Json::Value>> promise_;
};
基于ZooKeeper的灵感实现简易服务发现:
cpp复制class ServiceDiscovery {
public:
void registerService(const string& name, const Endpoint& ep) {
std::lock_guard<std::mutex> lock(mutex_);
services_[name].insert(ep);
}
std::set<Endpoint> discover(const string& name) {
std::lock_guard<std::mutex> lock(mutex_);
return services_[name];
}
private:
std::unordered_map<string, std::set<Endpoint>> services_;
std::mutex mutex_;
};
实现简单的TCP连接复用:
cpp复制class ConnectionPool {
public:
TcpConnectionPtr get(const Endpoint& ep) {
std::lock_guard<std::mutex> lock(mutex_);
auto& pool = pools_[ep];
if (!pool.empty()) {
auto conn = pool.back();
pool.pop_back();
return conn;
}
return createConnection(ep);
}
void release(const TcpConnectionPtr& conn) {
std::lock_guard<std::mutex> lock(mutex_);
pools_[conn->peerAddress()].push_back(conn);
}
private:
std::unordered_map<Endpoint, std::vector<TcpConnectionPtr>> pools_;
std::mutex mutex_;
};
预分配内存减少动态分配:
cpp复制Json::CharReaderBuilder builder;
builder.settings_["collectComments"] = false;
builder.settings_["maxStackDepth"] = 100;
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
char* end;
Json::Value root;
if (!reader->parse(jsonStr, jsonStr + strlen(jsonStr), &root, &end)) {
throw ParseError("Invalid JSON");
}
使用Valgrind检测常见问题:
bash复制valgrind --leak-check=full ./rpc_server
常见问题模式:
使用gperftools进行CPU profiling:
bash复制pprof --web ./rpc_server prof.data
常见性能热点:
服务端实现:
cpp复制Router router;
router.registerMethod("add", [](int a, int b) { return a + b; });
router.registerMethod("multiply", [](int a, int b) { return a * b; });
RpcServer server(&router);
server.start(8080);
客户端调用:
cpp复制RpcClient client("localhost", 8080);
auto result = client.call<int>("add", 2, 3); // 返回5
异步调用示例:
cpp复制auto future = client.asyncCall<int>("multiply", 4, 5);
// ...其他工作...
cout << future.get() << endl; // 输出20
这个框架还有很大的改进空间:
我在实际使用中发现,对于100节点以下的中小规模集群,这个框架已经能很好地满足需求。它的简洁性使得定制化开发非常方便,比如我们曾基于它实现了专门的物联网设备管理协议。