rpcx 框架文档
一个高性能、轻量级的 Go 语言 RPC 框架
什么是 rpcx
rpcx 是一个分布式的 Go 语言 RPC 框架,支持多种协议、多种注册中心、多种负载均衡策略。它简单易用、性能优越,是构建微服务的理想选择。
相较于 gRPC,rpcx 更轻量、更灵活,学习曲线更平缓。相较于标准库的 net/rpc,rpcx 提供了更丰富的服务治理功能。
核心优势
高性能
基于 TCP 的高性能通信,支持连接池复用,比标准库 net/rpc 性能提升显著
插件化
丰富的插件系统,支持服务发现、熔断、限流、监控等扩展
多协议
支持 TCP、QUIC、KCP、WebSocket、HTTP 等多种传输协议
服务治理
内置服务发现、负载均衡、故障转移、熔断降级等企业级特性
快速开始
安装
go get -u github.com/smallnest/rpcx/...
定义服务
首先定义服务的参数和返回值结构,以及服务方法:
package service
type Args struct {
A int
B int
}
type Reply struct {
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
return nil
}
服务端
package main
import (
"context"
"github.com/smallnest/rpcx/server"
)
func main() {
s := server.NewServer()
s.RegisterName("Arith", new(Arith), "")
s.Serve("tcp", "localhost:8972")
}
客户端
package main
import (
"context"
"github.com/smallnest/rpcx/client"
)
func main() {
d, _ := client.NewPeer2PeerDiscovery("tcp@localhost:8972", "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &Args{A: 10, B: 20}
reply := &Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
// reply.C = 200
}
核心特性总览
rpcx 提供了丰富的企业级 RPC 特性,以下按功能分类:
服务发现与注册
| 注册中心 | 插件 | 说明 |
|---|---|---|
| etcd v3 | serverplugin.EtcdV3RegisterPlugin | 推荐使用,支持服务健康检查 |
| Consul | ConsulRegisterPlugin | 支持服务网格 |
| ZooKeeper | ZooKeeperRegisterPlugin | 经典注册中心 |
| Redis | RedisRegisterPlugin | 使用 Redis 作为注册中心 |
| Nacos | NacosRegisterPlugin | 阿里开源的注册中心 |
| mDNS | NewMDNSRegisterPlugin | 本地网络服务发现 |
负载均衡策略
| 策略 | 选择器 | 说明 |
|---|---|---|
| 随机选择 | RandomSelect | 随机选择一个服务实例 |
| 轮询 | RoundRobinSelect | 依次轮询所有实例 |
| 加权轮询 | WeightedRoundRobin | 按权重分配请求 |
| 一致性哈希 | ConsistentHash | 相同参数请求到同一实例 |
| 地理位置 | GeoSelector | 按地理位置就近选择 |
| 网络延迟 | PingSelector | 基于 ICMP 延迟选择 |
故障处理模式
| 模式 | 说明 |
|---|---|
| Failfast | 快速失败,立即返回错误 |
| Failover | 故障转移,自动重试其他实例 |
| Failtry | 失败重试,重试当前实例 |
| Failbackup | 备用调用,同时调用多个实例取最快响应 |
服务发现
服务发现是微服务架构的核心组件。rpcx 支持多种主流注册中心。
使用 etcd 作为注册中心
服务端:
import "github.com/rpcxio/rpcx-etcd/serverplugin"
s := server.NewServer()
r := &serverplugin.EtcdV3RegisterPlugin{
ServiceAddress: "tcp@localhost:8972",
EtcdServers: []string{"localhost:2379"},
BasePath: "/rpcx_test",
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
客户端:
d := client.NewEtcdV3Discovery("/rpcx_test", "Arith", []string{"localhost:2379"}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
服务元数据
可以通过元数据对服务进行分组、设置状态等:
// 设置服务分组
s.RegisterName("Arith", new(Arith), "group=test")
// 设置服务状态(可用于灰度发布)
s.RegisterName("Arith", new(Arith), "state=active")
负载均衡
rpcx 提供多种负载均衡策略,可根据业务场景选择。
加权轮询
// 定义服务器的权重
servers := []*client.KVPair{
{Key: "tcp@localhost:8972", Value: "weight=7"}, // 70% 流量
{Key: "tcp@localhost:8973", Value: "weight=3"}, // 30% 流量
}
d, _ := client.NewMultipleServersDiscovery(servers)
xclient := client.NewXClient("Arith", client.Failtry, client.WeightedRoundRobin, d, client.DefaultOption)
一致性哈希
一致性哈希确保相同参数的请求始终路由到同一服务实例:
d, _ := client.NewPeer2PeerDiscovery("tcp@localhost:8972", "")
xclient := client.NewXClient("Arith", client.Failtry, client.ConsistentHash, d, client.DefaultOption)
故障处理
rpcx 提供多种故障处理模式,确保服务的高可用性。
故障转移 (Failover)
当调用失败时,自动尝试其他服务实例:
d, _ := client.NewMultipleServersDiscovery([]*client.KVPair{
{Key: "tcp@localhost:8972"},
{Key: "tcp@localhost:9981"},
})
option := client.DefaultOption
option.Retries = 10 // 最大重试次数
xclient := client.NewXClient("Arith", client.Failover, client.RandomSelect, d, option)
超时控制
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
err := xclient.Call(ctx, "Mul", args, reply)
熔断器
当服务实例连续失败达到阈值时,熔断器会暂时切断对该实例的调用,快速失败。
option := client.DefaultOption
// 连续失败 5 次后熔断,30 秒后尝试恢复
option.GenBreaker = func() client.Breaker {
return client.NewConsecCircuitBreaker(5, 30*time.Second)
}
d, _ := client.NewMultipleServersDiscovery(servers)
xclient := client.NewXClient("Arith", client.Failfast, client.RandomSelect, d, option)
熔断器与 Failfast 模式配合使用效果最佳,避免在熔断期间进行无效重试。
编码与序列化
rpcx 支持多种序列化方式,可根据需求选择。
| 编码 | 说明 | 性能 |
|---|---|---|
| 默认编码 | rpcx 自定义二进制协议 | 最高 |
| JSON | 标准 JSON 格式,易调试 | 较低 |
| Protobuf | Google Protocol Buffers | 高 |
| Gob | Go 语言原生序列化 | 中等 |
使用 JSON 序列化
option := client.DefaultOption
option.SerializeType = protocol.JSON
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
数据压缩
option := client.DefaultOption
option.CompressType = protocol.Gzip // 启用 gzip 压缩
传输协议
rpcx 支持多种传输协议,适应不同场景需求。
TCP (默认)
最常用的协议,稳定可靠
s.Serve("tcp", "localhost:8972")
QUIC
基于 UDP 的高性能协议,需构建标签
// go run -tags quic
s.Serve("quic", "localhost:8972")
KCP
可靠 UDP,低延迟场景
// go run -tags kcp
s.Serve("kcp", "localhost:8972")
WebSocket
浏览器环境友好
s.Serve("ws", "localhost:8972")
Unix Domain
本机高性能通信
s.Serve("unix", "/tmp/rpcx.socket")
安全与认证
Token 认证
服务端:
s := server.NewServer()
// 设置认证函数
s.AuthFunc = func(ctx context.Context, meta string) error {
if meta != "valid-token" {
return errors.New("invalid token")
}
return nil
}
客户端:
d, _ := client.NewPeer2PeerDiscovery("tcp@localhost:8972", "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
// 设置认证 token
xclient.Auth("valid-token")
TLS 加密
// 服务端
s := server.NewServer(server.WithTLSConfig(tlsConfig))
// 客户端
option.TLSConfig = tlsConfig
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
IP 黑名单
import "github.com/smallnest/rpcx/serverplugin"
plugin := serverplugin.NewBlacklistPlugin([]string{
"192.168.1.100",
"10.0.0.50",
})
s.Plugins.Add(plugin)
流式传输
rpcx 支持双向流式数据传输,适用于大文件传输、实时数据流等场景。
服务端
func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
fmt.Printf("Stream connected: %v\n", args.Meta)
// 处理流数据
io.Copy(conn, conn)
conn.Close()
}
func main() {
s := server.NewServer()
p := server.NewStreamService("localhost:8973", streamHandler, nil, 1000)
s.EnableStreamService(share.StreamServiceName, p)
s.Serve("tcp", "localhost:8972")
}
文件传输
rpcx 还提供了专门的文件传输服务:
p := server.NewFileTransfer("localhost:8973", uploadPath, nil, 1000)
s.EnableFileTransfer(share.FileTransferServiceName, p)
元数据传递
rpcx 支持在调用过程中传递元数据,可用于链路追踪、业务上下文传递等。
客户端发送元数据
import "github.com/smallnest/rpcx/share"
args := &Args{A: 10, B: 20}
reply := &Reply{}
// 设置请求元数据
ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, map[string]string{
"trace-id": "12345",
"user-id": "67890",
})
// 准备接收响应元数据
ctx = context.WithValue(ctx, share.ResMetaDataKey, make(map[string]string))
err := xclient.Call(ctx, "Mul", args, reply)
// 读取响应元数据
resMeta := ctx.Value(share.ResMetaDataKey).(map[string]string)
服务端处理元数据
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
// 读取请求元数据
reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
// 写入响应元数据
resMeta := ctx.Value(share.ResMetaDataKey).(map[string]string)
resMeta["server"] = "rpcx-server"
reply.C = args.A * args.B
return nil
}
插件系统
rpcx 提供了强大的插件系统,可以在服务调用生命周期的各个阶段插入自定义逻辑。
连接插件
监控客户端连接的建立和关闭:
type ConnectionPlugin struct {}
func (p *ConnectionPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
log.Printf("client %v connected", conn.RemoteAddr().String())
return conn, true
}
func (p *ConnectionPlugin) HandleConnClose(conn net.Conn) bool {
log.Printf("client %v closed", conn.RemoteAddr().String())
return true
}
// 注册插件
s.Plugins.Add(&ConnectionPlugin{})
调用包装插件
在方法调用前后执行自定义逻辑:
type WrapCall struct {}
func (p *WrapCall) PreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error) {
log.Printf("before %s.%s: args: %v", serviceName, methodName, args)
// 可以修改参数
return args, nil
}
func (p *WrapCall) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) {
log.Printf("after %s.%s: reply: %v", serviceName, methodName, reply)
// 可以修改返回值
return reply, nil
}
s.Plugins.Add(&WrapCall{})
异步调用
rpcx 支持异步调用方式,通过 Go 方法发起调用并立即返回,通过 channel 获取结果。
args := &Args{A: 10, B: 20}
reply := &Reply{}
// 发起异步调用
call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
if err != nil {
log.Fatal(err)
}
// 等待调用完成
replyCall := <-call.Done
if replyCall.Error != nil {
log.Fatal(replyCall.Error)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
异步调用适用于需要同时发起多个 RPC 调用的场景,可以显著提高并发性能。
双向通信
rpcx 支持服务端主动向客户端发送消息,实现双向通信。
服务端发送消息
var clientConn net.Conn
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
// 获取客户端连接
clientConn = ctx.Value(server.RemoteConnContextKey).(net.Conn)
reply.C = args.A * args.B
return nil
}
// 在 goroutine 中主动推送消息
go func() {
for clientConn != nil {
err := s.SendMessage(clientConn, "test_service", "test_method", nil, []byte("hello"))
if err != nil {
break
}
time.Sleep(time.Second)
}
}()
指标监控
rpcx 支持通过插件集成指标监控,可导出到 Graphite、Prometheus 等监控系统。
Metrics 插件
import "github.com/smallnest/rpcx/serverplugin"
// 创建 metrics 插件
plugin := serverplugin.NewMetricsPlugin("rpcx-server", "localhost:2003", time.Second)
s.Plugins.Add(plugin)
链路追踪
rpcx 支持 OpenTelemetry 分布式链路追踪。
OpenTelemetry 集成
import "github.com/smallnest/rpcx/protocol"
"github.com/rpcxio/rpcx-plugins/rotel"
// 服务端
plugin := rotel.NewOpenTelemetryPlugin("rpcx-server", tracerProvider)
s.Plugins.Add(plugin)
// 客户端
option := client.DefaultOption
option.TracerPlugin = rotel.NewClientTracer("rpcx-client", tracerProvider)
调试追踪
import "github.com/smallnest/rpcx/share"
// 启用追踪日志
share.Trace = true
更多示例
rpcx 提供了丰富的示例代码,涵盖了框架的各种特性:
| 目录 | 说明 | 源码 |
|---|---|---|
| 101basic | 基础 RPC 调用示例 | 查看 |
| 102basic | JSON 序列化示例 | 查看 |
| codec/protobuf | Protobuf 序列化 | 查看 |
| codec/gob | Go 原生序列化 | 查看 |
| codec/iterator | 迭代器模式 | 查看 |
| registry/etcdv3 | etcd 服务注册发现 | 查看 |
| registry/consul | Consul 服务注册发现 | 查看 |
| registry/cache | 注册中心缓存 | 查看 |
| registry/consul_pool | Consul 连接池 | 查看 |
| registry/dns | DNS 服务发现 | 查看 |
| registry/etcd | etcd (v2) 服务发现 | 查看 |
| registry/mdns | mDNS 服务发现 | 查看 |
| registry/nacos | Nacos 服务注册发现 | 查看 |
| registry/redis | Redis 服务注册发现 | 查看 |
| registry/zookeeper | Zookeeper 服务注册发现 | 查看 |
| selector/weighted | 加权负载均衡 | 查看 |
| selector/roundrobin | 轮询负载均衡 | 查看 |
| selector/random | 随机负载均衡 | 查看 |
| selector/hash | 一致性哈希 | 查看 |
| selector/ping | 基于网络延迟选择 | 查看 |
| selector/geo | 地理位置选择 | 查看 |
| selector/customized | 自定义选择器 | 查看 |
| failmode/failfast | 快速失败 | 查看 |
| failmode/failover | 故障转移 | 查看 |
| failmode/failbackup | 备用调用 | 查看 |
| failmode/failtry | 失败重试 | 查看 |
| breaker | 熔断器示例 | 查看 |
| timeout | 超时控制 | 查看 |
| auth | Token 认证 | 查看 |
| tls | TLS 加密通信 | 查看 |
| stream | 流式传输 | 查看 |
| filetransfer | 文件传输 | 查看 |
| metadata | 元数据传递 | 查看 |
| plugin | 插件系统 | 查看 |
| wrapcall | 调用包装 | 查看 |
| quic | QUIC 协议 | 查看 |
| kcp | KCP 协议 | 查看 |
| websocket | WebSocket 协议 | 查看 |
| bidirectional | 双向通信 | 查看 |
| async | 异步调用 | 查看 |
| opentelemetry | OpenTelemetry 追踪 | 查看 |
| metrics | 指标监控 | 查看 |
| reqratelimit | 请求限流 | 查看 |
| reflection | 服务反射 | 查看 |
| function | 函数注册 | 查看 |
| handler | 处理器模式 | 查看 |
| broadcast | 广播调用 | 查看 |
| sticky | 粘性连接 | 查看 |
| oneway | 单向调用 | 查看 |
| group | 服务分组 | 查看 |
| state | 服务状态控制 | 查看 |
| reconnect | 自动重连 | 查看 |
| heartbeat | 心跳检测 | 查看 |
| graceful_restart | 优雅重启 | 查看 |
| docker | Docker 部署 | 查看 |
| k8s | Kubernetes 集成 | 查看 |
| jsonrpc2 | JSON-RPC 2.0 | 查看 |
| http_invoke | HTTP 调用 | 查看 |
| alias | 服务别名 | 查看 |
| blacklist | IP 黑名单 | 查看 |
| compress | 数据压缩 | 查看 |
| pool | 连接池 | 查看 |
| reuseport | 端口复用 | 查看 |
| oneshot | 一次性调用 | 查看 |
| file_download | 文件下载 | 查看 |
| unixdomain | Unix Domain Socket | 查看 |
| ipv6 | IPv6 支持 | 查看 |
| memconn | 内存连接 | 查看 |
| plugin_blacklist | 插件式黑名单 | 查看 |
| disallow_bad_server | 禁用故障服务器 | 查看 |
| error | 错误处理基础 | 查看 |
| error_customized | 自定义错误 | 查看 |
| internal_error | 内部错误处理 | 查看 |
| server_timeout | 服务端超时 | 查看 |
| context_lock | 上下文锁 | 查看 |
| debug_trace | 调试追踪 | 查看 |
| oneclient | 单客户端模式 | 查看 |
| reconnect2 | 自动重连(第二种实现) | 查看 |
| restart | 重启 | 查看 |
| fork | Fork 进程 | 查看 |
| k8s_api | Kubernetes API | 查看 |
| k8s_dns | Kubernetes DNS | 查看 |
| k8s_nacos | Kubernetes + Nacos | 查看 |
| tcpcopy | TCP 拷贝 | 查看 |
| nil_panic | Nil 恐慌处理 | 查看 |
| nil_reply_client | 客户端 Nil 返回值 | 查看 |
| nil_reply_server | 服务端 Nil 返回值 | 查看 |
更多示例请参考:rpcx-examples