rpcx 框架文档

一个高性能、轻量级的 Go 语言 RPC 框架

什么是 rpcx

rpcx 是一个分布式的 Go 语言 RPC 框架,支持多种协议、多种注册中心、多种负载均衡策略。它简单易用、性能优越,是构建微服务的理想选择。

为什么选择 rpcx?

相较于 gRPC,rpcx 更轻量、更灵活,学习曲线更平缓。相较于标准库的 net/rpc,rpcx 提供了更丰富的服务治理功能。

核心优势

🚀

高性能

基于 TCP 的高性能通信,支持连接池复用,比标准库 net/rpc 性能提升显著

🔌

插件化

丰富的插件系统,支持服务发现、熔断、限流、监控等扩展

🌐

多协议

支持 TCP、QUIC、KCP、WebSocket、HTTP 等多种传输协议

🔍

服务治理

内置服务发现、负载均衡、故障转移、熔断降级等企业级特性

快速开始

安装

go get -u github.com/smallnest/rpcx/...

定义服务

首先定义服务的参数和返回值结构,以及服务方法:

package service

type Args struct {
    A int
    B int
}

type Reply struct {
    C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A * args.B
    return nil
}

服务端

package main

import (
    "context"
    "github.com/smallnest/rpcx/server"
)

func main() {
    s := server.NewServer()
    s.RegisterName("Arith", new(Arith), "")
    s.Serve("tcp", "localhost:8972")
}

客户端

package main

import (
    "context"
    "github.com/smallnest/rpcx/client"
)

func main() {
    d, _ := client.NewPeer2PeerDiscovery("tcp@localhost:8972", "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &Args{A: 10, B: 20}
    reply := &Reply{}
    err := xclient.Call(context.Background(), "Mul", args, reply)

    // reply.C = 200
}

核心特性总览

rpcx 提供了丰富的企业级 RPC 特性,以下按功能分类:

服务发现与注册

注册中心 插件 说明
etcd v3 serverplugin.EtcdV3RegisterPlugin 推荐使用,支持服务健康检查
Consul ConsulRegisterPlugin 支持服务网格
ZooKeeper ZooKeeperRegisterPlugin 经典注册中心
Redis RedisRegisterPlugin 使用 Redis 作为注册中心
Nacos NacosRegisterPlugin 阿里开源的注册中心
mDNS NewMDNSRegisterPlugin 本地网络服务发现

负载均衡策略

策略 选择器 说明
随机选择 RandomSelect 随机选择一个服务实例
轮询 RoundRobinSelect 依次轮询所有实例
加权轮询 WeightedRoundRobin 按权重分配请求
一致性哈希 ConsistentHash 相同参数请求到同一实例
地理位置 GeoSelector 按地理位置就近选择
网络延迟 PingSelector 基于 ICMP 延迟选择

故障处理模式

模式 说明
Failfast 快速失败,立即返回错误
Failover 故障转移,自动重试其他实例
Failtry 失败重试,重试当前实例
Failbackup 备用调用,同时调用多个实例取最快响应

服务发现

服务发现是微服务架构的核心组件。rpcx 支持多种主流注册中心。

使用 etcd 作为注册中心

服务端:

import "github.com/rpcxio/rpcx-etcd/serverplugin"

s := server.NewServer()

r := &serverplugin.EtcdV3RegisterPlugin{
    ServiceAddress: "tcp@localhost:8972",
    EtcdServers:    []string{"localhost:2379"},
    BasePath:       "/rpcx_test",
    UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
    log.Fatal(err)
}
s.Plugins.Add(r)

客户端:

d := client.NewEtcdV3Discovery("/rpcx_test", "Arith", []string{"localhost:2379"}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)

服务元数据

可以通过元数据对服务进行分组、设置状态等:

// 设置服务分组
s.RegisterName("Arith", new(Arith), "group=test")

// 设置服务状态(可用于灰度发布)
s.RegisterName("Arith", new(Arith), "state=active")

负载均衡

rpcx 提供多种负载均衡策略,可根据业务场景选择。

加权轮询

// 定义服务器的权重
servers := []*client.KVPair{
    {Key: "tcp@localhost:8972", Value: "weight=7"},  // 70% 流量
    {Key: "tcp@localhost:8973", Value: "weight=3"},  // 30% 流量
}

d, _ := client.NewMultipleServersDiscovery(servers)
xclient := client.NewXClient("Arith", client.Failtry, client.WeightedRoundRobin, d, client.DefaultOption)

一致性哈希

一致性哈希确保相同参数的请求始终路由到同一服务实例:

d, _ := client.NewPeer2PeerDiscovery("tcp@localhost:8972", "")
xclient := client.NewXClient("Arith", client.Failtry, client.ConsistentHash, d, client.DefaultOption)

故障处理

rpcx 提供多种故障处理模式,确保服务的高可用性。

故障转移 (Failover)

当调用失败时,自动尝试其他服务实例:

d, _ := client.NewMultipleServersDiscovery([]*client.KVPair{
    {Key: "tcp@localhost:8972"},
    {Key: "tcp@localhost:9981"},
})

option := client.DefaultOption
option.Retries = 10  // 最大重试次数

xclient := client.NewXClient("Arith", client.Failover, client.RandomSelect, d, option)

超时控制

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

err := xclient.Call(ctx, "Mul", args, reply)

熔断器

当服务实例连续失败达到阈值时,熔断器会暂时切断对该实例的调用,快速失败。

option := client.DefaultOption

// 连续失败 5 次后熔断,30 秒后尝试恢复
option.GenBreaker = func() client.Breaker {
    return client.NewConsecCircuitBreaker(5, 30*time.Second)
}

d, _ := client.NewMultipleServersDiscovery(servers)
xclient := client.NewXClient("Arith", client.Failfast, client.RandomSelect, d, option)
注意

熔断器与 Failfast 模式配合使用效果最佳,避免在熔断期间进行无效重试。

编码与序列化

rpcx 支持多种序列化方式,可根据需求选择。

编码 说明 性能
默认编码 rpcx 自定义二进制协议 最高
JSON 标准 JSON 格式,易调试 较低
Protobuf Google Protocol Buffers
Gob Go 语言原生序列化 中等

使用 JSON 序列化

option := client.DefaultOption
option.SerializeType = protocol.JSON

xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)

数据压缩

option := client.DefaultOption
option.CompressType = protocol.Gzip  // 启用 gzip 压缩

传输协议

rpcx 支持多种传输协议,适应不同场景需求。

TCP (默认)

最常用的协议,稳定可靠

s.Serve("tcp", "localhost:8972")

QUIC

基于 UDP 的高性能协议,需构建标签

// go run -tags quic
s.Serve("quic", "localhost:8972")

KCP

可靠 UDP,低延迟场景

// go run -tags kcp
s.Serve("kcp", "localhost:8972")

WebSocket

浏览器环境友好

s.Serve("ws", "localhost:8972")

Unix Domain

本机高性能通信

s.Serve("unix", "/tmp/rpcx.socket")

安全与认证

Token 认证

服务端:

s := server.NewServer()

// 设置认证函数
s.AuthFunc = func(ctx context.Context, meta string) error {
    if meta != "valid-token" {
        return errors.New("invalid token")
    }
    return nil
}

客户端:

d, _ := client.NewPeer2PeerDiscovery("tcp@localhost:8972", "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)

// 设置认证 token
xclient.Auth("valid-token")

TLS 加密

// 服务端
s := server.NewServer(server.WithTLSConfig(tlsConfig))

// 客户端
option.TLSConfig = tlsConfig
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)

IP 黑名单

import "github.com/smallnest/rpcx/serverplugin"

plugin := serverplugin.NewBlacklistPlugin([]string{
    "192.168.1.100",
    "10.0.0.50",
})
s.Plugins.Add(plugin)

流式传输

rpcx 支持双向流式数据传输,适用于大文件传输、实时数据流等场景。

服务端

func streamHandler(conn net.Conn, args *share.StreamServiceArgs) {
    fmt.Printf("Stream connected: %v\n", args.Meta)

    // 处理流数据
    io.Copy(conn, conn)
    conn.Close()
}

func main() {
    s := server.NewServer()

    p := server.NewStreamService("localhost:8973", streamHandler, nil, 1000)
    s.EnableStreamService(share.StreamServiceName, p)

    s.Serve("tcp", "localhost:8972")
}

文件传输

rpcx 还提供了专门的文件传输服务:

p := server.NewFileTransfer("localhost:8973", uploadPath, nil, 1000)
s.EnableFileTransfer(share.FileTransferServiceName, p)

元数据传递

rpcx 支持在调用过程中传递元数据,可用于链路追踪、业务上下文传递等。

客户端发送元数据

import "github.com/smallnest/rpcx/share"

args := &Args{A: 10, B: 20}
reply := &Reply{}

// 设置请求元数据
ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, map[string]string{
    "trace-id": "12345",
    "user-id":  "67890",
})

// 准备接收响应元数据
ctx = context.WithValue(ctx, share.ResMetaDataKey, make(map[string]string))

err := xclient.Call(ctx, "Mul", args, reply)

// 读取响应元数据
resMeta := ctx.Value(share.ResMetaDataKey).(map[string]string)

服务端处理元数据

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    // 读取请求元数据
    reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)

    // 写入响应元数据
    resMeta := ctx.Value(share.ResMetaDataKey).(map[string]string)
    resMeta["server"] = "rpcx-server"

    reply.C = args.A * args.B
    return nil
}

插件系统

rpcx 提供了强大的插件系统,可以在服务调用生命周期的各个阶段插入自定义逻辑。

连接插件

监控客户端连接的建立和关闭:

type ConnectionPlugin struct {}

func (p *ConnectionPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
    log.Printf("client %v connected", conn.RemoteAddr().String())
    return conn, true
}

func (p *ConnectionPlugin) HandleConnClose(conn net.Conn) bool {
    log.Printf("client %v closed", conn.RemoteAddr().String())
    return true
}

// 注册插件
s.Plugins.Add(&ConnectionPlugin{})

调用包装插件

在方法调用前后执行自定义逻辑:

type WrapCall struct {}

func (p *WrapCall) PreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error) {
    log.Printf("before %s.%s: args: %v", serviceName, methodName, args)
    // 可以修改参数
    return args, nil
}

func (p *WrapCall) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) {
    log.Printf("after %s.%s: reply: %v", serviceName, methodName, reply)
    // 可以修改返回值
    return reply, nil
}

s.Plugins.Add(&WrapCall{})

异步调用

rpcx 支持异步调用方式,通过 Go 方法发起调用并立即返回,通过 channel 获取结果。

args := &Args{A: 10, B: 20}
reply := &Reply{}

// 发起异步调用
call, err := xclient.Go(context.Background(), "Mul", args, reply, nil)
if err != nil {
    log.Fatal(err)
}

// 等待调用完成
replyCall := <-call.Done
if replyCall.Error != nil {
    log.Fatal(replyCall.Error)
}

log.Printf("%d * %d = %d", args.A, args.B, reply.C)

异步调用适用于需要同时发起多个 RPC 调用的场景,可以显著提高并发性能。

双向通信

rpcx 支持服务端主动向客户端发送消息,实现双向通信。

服务端发送消息

var clientConn net.Conn

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    // 获取客户端连接
    clientConn = ctx.Value(server.RemoteConnContextKey).(net.Conn)
    reply.C = args.A * args.B
    return nil
}

// 在 goroutine 中主动推送消息
go func() {
    for clientConn != nil {
        err := s.SendMessage(clientConn, "test_service", "test_method", nil, []byte("hello"))
        if err != nil {
            break
        }
        time.Sleep(time.Second)
    }
}()

指标监控

rpcx 支持通过插件集成指标监控,可导出到 Graphite、Prometheus 等监控系统。

Metrics 插件

import "github.com/smallnest/rpcx/serverplugin"

// 创建 metrics 插件
plugin := serverplugin.NewMetricsPlugin("rpcx-server", "localhost:2003", time.Second)
s.Plugins.Add(plugin)

链路追踪

rpcx 支持 OpenTelemetry 分布式链路追踪。

OpenTelemetry 集成

import "github.com/smallnest/rpcx/protocol"
    "github.com/rpcxio/rpcx-plugins/rotel"

// 服务端
plugin := rotel.NewOpenTelemetryPlugin("rpcx-server", tracerProvider)
s.Plugins.Add(plugin)

// 客户端
option := client.DefaultOption
option.TracerPlugin = rotel.NewClientTracer("rpcx-client", tracerProvider)

调试追踪

import "github.com/smallnest/rpcx/share"

// 启用追踪日志
share.Trace = true

更多示例

rpcx 提供了丰富的示例代码,涵盖了框架的各种特性:

目录 说明 源码
101basic基础 RPC 调用示例查看
102basicJSON 序列化示例查看
codec/protobufProtobuf 序列化查看
codec/gobGo 原生序列化查看
codec/iterator迭代器模式查看
registry/etcdv3etcd 服务注册发现查看
registry/consulConsul 服务注册发现查看
registry/cache注册中心缓存查看
registry/consul_poolConsul 连接池查看
registry/dnsDNS 服务发现查看
registry/etcdetcd (v2) 服务发现查看
registry/mdnsmDNS 服务发现查看
registry/nacosNacos 服务注册发现查看
registry/redisRedis 服务注册发现查看
registry/zookeeperZookeeper 服务注册发现查看
selector/weighted加权负载均衡查看
selector/roundrobin轮询负载均衡查看
selector/random随机负载均衡查看
selector/hash一致性哈希查看
selector/ping基于网络延迟选择查看
selector/geo地理位置选择查看
selector/customized自定义选择器查看
failmode/failfast快速失败查看
failmode/failover故障转移查看
failmode/failbackup备用调用查看
failmode/failtry失败重试查看
breaker熔断器示例查看
timeout超时控制查看
authToken 认证查看
tlsTLS 加密通信查看
stream流式传输查看
filetransfer文件传输查看
metadata元数据传递查看
plugin插件系统查看
wrapcall调用包装查看
quicQUIC 协议查看
kcpKCP 协议查看
websocketWebSocket 协议查看
bidirectional双向通信查看
async异步调用查看
opentelemetryOpenTelemetry 追踪查看
metrics指标监控查看
reqratelimit请求限流查看
reflection服务反射查看
function函数注册查看
handler处理器模式查看
broadcast广播调用查看
sticky粘性连接查看
oneway单向调用查看
group服务分组查看
state服务状态控制查看
reconnect自动重连查看
heartbeat心跳检测查看
graceful_restart优雅重启查看
dockerDocker 部署查看
k8sKubernetes 集成查看
jsonrpc2JSON-RPC 2.0查看
http_invokeHTTP 调用查看
alias服务别名查看
blacklistIP 黑名单查看
compress数据压缩查看
pool连接池查看
reuseport端口复用查看
oneshot一次性调用查看
file_download文件下载查看
unixdomainUnix Domain Socket查看
ipv6IPv6 支持查看
memconn内存连接查看
plugin_blacklist插件式黑名单查看
disallow_bad_server禁用故障服务器查看
error错误处理基础查看
error_customized自定义错误查看
internal_error内部错误处理查看
server_timeout服务端超时查看
context_lock上下文锁查看
debug_trace调试追踪查看
oneclient单客户端模式查看
reconnect2自动重连(第二种实现)查看
restart重启查看
forkFork 进程查看
k8s_apiKubernetes API查看
k8s_dnsKubernetes DNS查看
k8s_nacosKubernetes + Nacos查看
tcpcopyTCP 拷贝查看
nil_panicNil 恐慌处理查看
nil_reply_client客户端 Nil 返回值查看
nil_reply_server服务端 Nil 返回值查看

更多示例请参考:rpcx-examples

参考资料