Go - gRPC
2025/12/12大约 4 分钟
Go - gRPC
gRPC 是 Google 开发的高性能 RPC 框架,使用 Protocol Buffers 作为序列化协议。
安装
# 安装 protoc 编译器
# macOS
brew install protobuf
# 安装 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 安装 gRPC
go get google.golang.org/grpc定义服务
Proto 文件
// api/user.proto
syntax = "proto3";
package api;
option go_package = "myproject/api";
// 用户服务
service UserService {
// 获取用户
rpc GetUser(GetUserRequest) returns (User);
// 创建用户
rpc CreateUser(CreateUserRequest) returns (User);
// 用户列表
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// 服务端流
rpc WatchUsers(WatchUsersRequest) returns (stream User);
// 客户端流
rpc UploadUsers(stream User) returns (UploadUsersResponse);
// 双向流
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message User {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
}
message GetUserRequest {
int64 id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message ListUsersResponse {
repeated User users = 1;
int64 total = 2;
}
message WatchUsersRequest {}
message UploadUsersResponse {
int32 count = 1;
}
message ChatMessage {
string user = 1;
string content = 2;
}生成代码
protoc --go_out=. --go-grpc_out=. api/user.proto服务端实现
package main
import (
"context"
"io"
"log"
"net"
"google.golang.org/grpc"
pb "myproject/api"
)
type userService struct {
pb.UnimplementedUserServiceServer
}
// 一元 RPC
func (s *userService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 模拟查询数据库
user := &pb.User{
Id: req.Id,
Name: "张三",
Email: "zhangsan@example.com",
Age: 25,
}
return user, nil
}
func (s *userService) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
user := &pb.User{
Id: 1,
Name: req.Name,
Email: req.Email,
Age: req.Age,
}
return user, nil
}
func (s *userService) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) {
users := []*pb.User{
{Id: 1, Name: "张三", Email: "zhangsan@example.com"},
{Id: 2, Name: "李四", Email: "lisi@example.com"},
}
return &pb.ListUsersResponse{
Users: users,
Total: 2,
}, nil
}
// 服务端流
func (s *userService) WatchUsers(req *pb.WatchUsersRequest, stream pb.UserService_WatchUsersServer) error {
users := []*pb.User{
{Id: 1, Name: "张三"},
{Id: 2, Name: "李四"},
{Id: 3, Name: "王五"},
}
for _, user := range users {
if err := stream.Send(user); err != nil {
return err
}
}
return nil
}
// 客户端流
func (s *userService) UploadUsers(stream pb.UserService_UploadUsersServer) error {
count := 0
for {
user, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadUsersResponse{
Count: int32(count),
})
}
if err != nil {
return err
}
log.Printf("Received user: %s\n", user.Name)
count++
}
}
// 双向流
func (s *userService) Chat(stream pb.UserService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("Received: %s: %s\n", msg.User, msg.Content)
// 回复消息
reply := &pb.ChatMessage{
User: "Server",
Content: "收到: " + msg.Content,
}
if err := stream.Send(reply); err != nil {
return err
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatal(err)
}
server := grpc.NewServer()
pb.RegisterUserServiceServer(server, &userService{})
log.Println("gRPC server listening on :50051")
if err := server.Serve(lis); err != nil {
log.Fatal(err)
}
}客户端实现
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "myproject/api"
)
func main() {
// 连接服务端
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
ctx := context.Background()
// 一元 RPC
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
log.Fatal(err)
}
log.Printf("GetUser: %v\n", user)
// 服务端流
stream, err := client.WatchUsers(ctx, &pb.WatchUsersRequest{})
if err != nil {
log.Fatal(err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
log.Printf("WatchUsers: %v\n", user)
}
// 客户端流
uploadStream, err := client.UploadUsers(ctx)
if err != nil {
log.Fatal(err)
}
users := []*pb.User{
{Name: "张三"},
{Name: "李四"},
{Name: "王五"},
}
for _, user := range users {
if err := uploadStream.Send(user); err != nil {
log.Fatal(err)
}
}
resp, err := uploadStream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
log.Printf("UploadUsers: %d\n", resp.Count)
// 双向流
chatStream, err := client.Chat(ctx)
if err != nil {
log.Fatal(err)
}
go func() {
for {
msg, err := chatStream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatal(err)
}
log.Printf("Chat received: %s: %s\n", msg.User, msg.Content)
}
}()
messages := []string{"Hello", "World", "Bye"}
for _, content := range messages {
if err := chatStream.Send(&pb.ChatMessage{
User: "Client",
Content: content,
}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
chatStream.CloseSend()
}拦截器
服务端拦截器
// 一元拦截器
func unaryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// 调用实际处理函数
resp, err := handler(ctx, req)
// 记录日志
log.Printf("Method: %s, Duration: %v, Error: %v\n",
info.FullMethod, time.Since(start), err)
return resp, err
}
// 流拦截器
func streamInterceptor(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
err := handler(srv, ss)
log.Printf("Stream: %s, Duration: %v, Error: %v\n",
info.FullMethod, time.Since(start), err)
return err
}
// 使用
server := grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
)客户端拦截器
// 一元拦截器
func clientUnaryInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("Method: %s, Duration: %v\n", method, time.Since(start))
return err
}
// 使用
conn, err := grpc.Dial("localhost:50051",
grpc.WithUnaryInterceptor(clientUnaryInterceptor),
)元数据
import "google.golang.org/grpc/metadata"
// 客户端发送元数据
md := metadata.Pairs(
"authorization", "Bearer token",
"request-id", "12345",
)
ctx := metadata.NewOutgoingContext(context.Background(), md)
resp, err := client.GetUser(ctx, req)
// 服务端获取元数据
func (s *userService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if tokens := md.Get("authorization"); len(tokens) > 0 {
log.Printf("Token: %s\n", tokens[0])
}
}
// ...
}
// 服务端发送元数据
func (s *userService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
header := metadata.Pairs("response-id", "12345")
grpc.SendHeader(ctx, header)
// ...
}错误处理
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// 服务端返回错误
func (s *userService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
if req.Id <= 0 {
return nil, status.Error(codes.InvalidArgument, "无效的用户 ID")
}
user := findUser(req.Id)
if user == nil {
return nil, status.Error(codes.NotFound, "用户不存在")
}
return user, nil
}
// 客户端处理错误
resp, err := client.GetUser(ctx, req)
if err != nil {
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.NotFound:
log.Println("用户不存在")
case codes.InvalidArgument:
log.Println("参数错误:", st.Message())
default:
log.Println("未知错误:", st.Message())
}
}
}健康检查
import (
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
func main() {
server := grpc.NewServer()
// 注册健康检查服务
healthServer := health.NewServer()
healthpb.RegisterHealthServer(server, healthServer)
// 设置服务状态
healthServer.SetServingStatus("myservice", healthpb.HealthCheckResponse_SERVING)
// ...
}