Go · #go#grpc#protobuf#microservice

Go gRPC微服务开发完全指南

2023.09.20 Go 9 min 3.8k
// 目录 · contents

引言

gRPC是Google开源的高性能、跨语言的RPC框架,基于HTTP/2协议和Protocol Buffers序列化格式。在微服务架构中,gRPC凭借其高效的二进制协议、强类型的接口定义、以及丰富的流式传输能力,已成为服务间通信的主流选择之一。

本文将系统介绍Go语言中gRPC微服务的开发方法,从Protocol Buffers定义开始,覆盖四种RPC模式、拦截器、负载均衡、服务发现、错误处理和健康检查等核心主题。

架构概览

graph TB
    subgraph "gRPC 架构"
        CLIENT["gRPC Client"]
        SERVER["gRPC Server"]

        CLIENT -->|"HTTP/2 + Protobuf"| SERVER

        subgraph "Client Side"
            STUB["Generated Stub"] --> CC["ClientConn"]
            CC --> RESOLVER["Name Resolver"]
            CC --> BALANCER["Load Balancer"]
            CC --> INTERCEPTOR_C["Client Interceptors"]
        end

        subgraph "Server Side"
            SVC["Service Implementation"]
            INTERCEPTOR_S["Server Interceptors"]
            INTERCEPTOR_S --> SVC
        end
    end

    CLIENT --> STUB
    SERVER --> INTERCEPTOR_S

Protocol Buffers

定义服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// proto/user/v1/user.proto
syntax = "proto3";

package user.v1;

option go_package = "github.com/example/myapp/gen/user/v1;userv1";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/field_mask.proto";

// 用户服务定义
service UserService {
// Unary RPC - 一元调用
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);

// Server Streaming RPC - 服务端流
rpc ListUsers(ListUsersRequest) returns (stream User);

// Client Streaming RPC - 客户端流
rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateUsersResponse);

// Bidirectional Streaming RPC - 双向流
rpc SyncUsers(stream SyncRequest) returns (stream SyncResponse);
}

// 消息定义
message User {
string id = 1;
string name = 2;
string email = 3;
int32 age = 4;
UserStatus status = 5;
google.protobuf.Timestamp created_at = 6;
google.protobuf.Timestamp updated_at = 7;
}

enum UserStatus {
USER_STATUS_UNSPECIFIED = 0;
USER_STATUS_ACTIVE = 1;
USER_STATUS_INACTIVE = 2;
USER_STATUS_BANNED = 3;
}

message GetUserRequest {
string id = 1;
}

message GetUserResponse {
User user = 1;
}

message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}

message CreateUserResponse {
User user = 1;
}

message UpdateUserRequest {
User user = 1;
google.protobuf.FieldMask update_mask = 2;
}

message UpdateUserResponse {
User user = 1;
}

message DeleteUserRequest {
string id = 1;
}

message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}

message BatchCreateUsersResponse {
int32 created_count = 1;
repeated string user_ids = 2;
}

message SyncRequest {
oneof action {
User upsert = 1;
string delete_id = 2;
}
}

message SyncResponse {
string id = 1;
string status = 2;
}

代码生成

1
2
3
4
5
6
7
8
# 安装工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 生成Go代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/user/v1/user.proto

四种RPC模式

1. Unary RPC(一元调用)

sequenceDiagram
    participant C as Client
    participant S as Server
    C->>S: GetUserRequest
    S-->>C: GetUserResponse
    Note over C,S: 最简单的请求-响应模式

服务端实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"context"
"fmt"
"log"
"net"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

userv1 "github.com/example/myapp/gen/user/v1"
)

type userServer struct {
userv1.UnimplementedUserServiceServer
mu sync.RWMutex
users map[string]*userv1.User
}

func newUserServer() *userServer {
return &userServer{
users: make(map[string]*userv1.User),
}
}

// Unary RPC: GetUser
func (s *userServer) GetUser(ctx context.Context, req *userv1.GetUserRequest) (*userv1.GetUserResponse, error) {
if req.Id == "" {
return nil, status.Error(codes.InvalidArgument, "user id is required")
}

s.mu.RLock()
defer s.mu.RUnlock()

user, ok := s.users[req.Id]
if !ok {
return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
}

return &userv1.GetUserResponse{User: user}, nil
}

// Unary RPC: CreateUser
func (s *userServer) CreateUser(ctx context.Context, req *userv1.CreateUserRequest) (*userv1.CreateUserResponse, error) {
if req.Name == "" || req.Email == "" {
return nil, status.Error(codes.InvalidArgument, "name and email are required")
}

user := &userv1.User{
Id: generateID(),
Name: req.Name,
Email: req.Email,
Age: req.Age,
Status: userv1.UserStatus_USER_STATUS_ACTIVE,
CreatedAt: timestamppb.Now(),
UpdatedAt: timestamppb.Now(),
}

s.mu.Lock()
s.users[user.Id] = user
s.mu.Unlock()

return &userv1.CreateUserResponse{User: user}, nil
}

// Unary RPC: DeleteUser
func (s *userServer) DeleteUser(ctx context.Context, req *userv1.DeleteUserRequest) (*emptypb.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()

if _, ok := s.users[req.Id]; !ok {
return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
}

delete(s.users, req.Id)
return &emptypb.Empty{}, nil
}

func generateID() string {
return fmt.Sprintf("user-%d", time.Now().UnixNano())
}

2. Server Streaming RPC(服务端流)

sequenceDiagram
    participant C as Client
    participant S as Server
    C->>S: ListUsersRequest
    S-->>C: User 1
    S-->>C: User 2
    S-->>C: User 3
    S-->>C: (stream end)
    Note over C,S: 服务端持续发送数据流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Server Streaming: ListUsers
func (s *userServer) ListUsers(req *userv1.ListUsersRequest, stream userv1.UserService_ListUsersServer) error {
s.mu.RLock()
defer s.mu.RUnlock()

count := 0
for _, user := range s.users {
// 检查context是否被取消
if err := stream.Context().Err(); err != nil {
return status.Error(codes.Canceled, "client cancelled")
}

if err := stream.Send(user); err != nil {
return status.Errorf(codes.Internal, "failed to send user: %v", err)
}
count++

// 如果设置了分页大小
if req.PageSize > 0 && int32(count) >= req.PageSize {
break
}
}

return nil
}

3. Client Streaming RPC(客户端流)

sequenceDiagram
    participant C as Client
    participant S as Server
    C->>S: CreateUserRequest 1
    C->>S: CreateUserRequest 2
    C->>S: CreateUserRequest 3
    C->>S: (stream end)
    S-->>C: BatchCreateUsersResponse
    Note over C,S: 客户端持续发送,服务端最终响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Client Streaming: BatchCreateUsers
func (s *userServer) BatchCreateUsers(stream userv1.UserService_BatchCreateUsersServer) error {
var userIDs []string
count := int32(0)

for {
req, err := stream.Recv()
if err != nil {
// 客户端发送完毕
if err.Error() == "EOF" {
break
}
return status.Errorf(codes.Internal, "failed to receive: %v", err)
}

user := &userv1.User{
Id: generateID(),
Name: req.Name,
Email: req.Email,
Age: req.Age,
Status: userv1.UserStatus_USER_STATUS_ACTIVE,
CreatedAt: timestamppb.Now(),
UpdatedAt: timestamppb.Now(),
}

s.mu.Lock()
s.users[user.Id] = user
s.mu.Unlock()

userIDs = append(userIDs, user.Id)
count++
}

return stream.SendAndClose(&userv1.BatchCreateUsersResponse{
CreatedCount: count,
UserIds: userIDs,
})
}

4. Bidirectional Streaming RPC(双向流)

sequenceDiagram
    participant C as Client
    participant S as Server
    C->>S: SyncRequest (upsert user1)
    S-->>C: SyncResponse (ok)
    C->>S: SyncRequest (delete user2)
    C->>S: SyncRequest (upsert user3)
    S-->>C: SyncResponse (ok)
    S-->>C: SyncResponse (ok)
    Note over C,S: 双方独立发送和接收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Bidirectional Streaming: SyncUsers
func (s *userServer) SyncUsers(stream userv1.UserService_SyncUsersServer) error {
for {
req, err := stream.Recv()
if err != nil {
if err.Error() == "EOF" {
return nil
}
return status.Errorf(codes.Internal, "failed to receive: %v", err)
}

var resp *userv1.SyncResponse

switch action := req.Action.(type) {
case *userv1.SyncRequest_Upsert:
s.mu.Lock()
action.Upsert.UpdatedAt = timestamppb.Now()
s.users[action.Upsert.Id] = action.Upsert
s.mu.Unlock()
resp = &userv1.SyncResponse{
Id: action.Upsert.Id,
Status: "upserted",
}

case *userv1.SyncRequest_DeleteId:
s.mu.Lock()
delete(s.users, action.DeleteId)
s.mu.Unlock()
resp = &userv1.SyncResponse{
Id: action.DeleteId,
Status: "deleted",
}
}

if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Internal, "failed to send response: %v", err)
}
}
}

拦截器(Interceptor)

拦截器是gRPC版的中间件,分为Unary和Stream两种类型。

flowchart LR
    subgraph "Client Interceptors"
        CI1["日志"] --> CI2["认证"] --> CI3["重试"]
    end

    CI3 -->|gRPC Call| SI1

    subgraph "Server Interceptors"
        SI1["恢复"] --> SI2["日志"] --> SI3["认证"]
    end

    SI3 --> HANDLER["Handler"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package main

import (
"context"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// 服务端日志拦截器
func loggingUnaryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()

// 调用实际的handler
resp, err := handler(ctx, req)

// 记录日志
duration := time.Since(start)
statusCode := codes.OK
if err != nil {
statusCode = status.Code(err)
}
log.Printf("method=%s duration=%v status=%s error=%v",
info.FullMethod, duration, statusCode, err)

return resp, err
}

// 服务端认证拦截器
func authUnaryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 跳过不需要认证的方法
if info.FullMethod == "/grpc.health.v1.Health/Check" {
return handler(ctx, req)
}

// 从metadata中获取token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}

tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing authorization token")
}

// 验证token
userID, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}

// 将用户信息添加到context
newCtx := context.WithValue(ctx, "userID", userID)
return handler(newCtx, req)
}

// 服务端恢复拦截器
func recoveryUnaryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("panic recovered in %s: %v", info.FullMethod, r)
err = status.Errorf(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}

// 客户端超时拦截器
func timeoutUnaryInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return invoker(ctx, method, req, reply, cc, opts...)
}
}

// 客户端重试拦截器
func retryUnaryInterceptor(maxRetries int) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
var lastErr error
for i := 0; i <= maxRetries; i++ {
lastErr = invoker(ctx, method, req, reply, cc, opts...)
if lastErr == nil {
return nil
}

// 只重试特定错误码
code := status.Code(lastErr)
if code != codes.Unavailable && code != codes.DeadlineExceeded {
return lastErr
}

// 指数退避
backoff := time.Duration(1<<uint(i)) * 100 * time.Millisecond
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
}
return lastErr
}
}

func validateToken(token string) (string, error) {
if token == "valid-token" {
return "user-123", nil
}
return "", status.Error(codes.Unauthenticated, "invalid token")
}

服务端与客户端启动

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"log"
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

userv1 "github.com/example/myapp/gen/user/v1"
)

func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

// 创建gRPC服务器,链式拦截器
server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
recoveryUnaryInterceptor,
loggingUnaryInterceptor,
authUnaryInterceptor,
),
)

// 注册用户服务
userSvc := newUserServer()
userv1.RegisterUserServiceServer(server, userSvc)

// 注册健康检查服务
healthServer := health.NewServer()
healthpb.RegisterHealthServer(server, healthServer)
healthServer.SetServingStatus("user.v1.UserService", healthpb.HealthCheckResponse_SERVING)

// 注册反射服务(开发环境用于grpcurl调试)
reflection.Register(server)

log.Printf("gRPC server listening on :50051")
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"context"
"fmt"
"io"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

userv1 "github.com/example/myapp/gen/user/v1"
)

func main() {
// 建立连接
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(
timeoutUnaryInterceptor(5*time.Second),
retryUnaryInterceptor(3),
),
)
if err != nil {
log.Fatalf("failed to dial: %v", err)
}
defer conn.Close()

client := userv1.NewUserServiceClient(conn)

// 添加认证metadata
ctx := metadata.AppendToOutgoingContext(context.Background(),
"authorization", "valid-token",
)

// Unary调用
createResp, err := client.CreateUser(ctx, &userv1.CreateUserRequest{
Name: "Alice",
Email: "[email protected]",
Age: 30,
})
if err != nil {
log.Fatalf("CreateUser failed: %v", err)
}
fmt.Printf("Created user: %s\n", createResp.User.Id)

// Server Streaming调用
stream, err := client.ListUsers(ctx, &userv1.ListUsersRequest{
PageSize: 10,
})
if err != nil {
log.Fatalf("ListUsers failed: %v", err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("stream recv failed: %v", err)
}
fmt.Printf("User: %s (%s)\n", user.Name, user.Email)
}
}

错误处理

gRPC使用标准的status/code体系进行错误处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// 返回带详细信息的错误
func createDetailedError() error {
st := status.New(codes.InvalidArgument, "invalid user data")

// 添加字段级别的错误详情
details, err := st.WithDetails(
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequest_FieldViolation{
{
Field: "email",
Description: "invalid email format",
},
{
Field: "age",
Description: "age must be between 0 and 150",
},
},
},
)
if err != nil {
return st.Err()
}
return details.Err()
}

// 客户端解析错误详情
func handleError(err error) {
st := status.Convert(err)
for _, detail := range st.Details() {
switch d := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range d.FieldViolations {
log.Printf("Field %s: %s", violation.Field, violation.Description)
}
}
}
}
graph LR
    subgraph "gRPC 错误码映射"
        OK["OK (0)"] --> H200["HTTP 200"]
        INVALID["InvalidArgument (3)"] --> H400["HTTP 400"]
        NOT_FOUND["NotFound (5)"] --> H404["HTTP 404"]
        ALREADY_EXISTS["AlreadyExists (6)"] --> H409["HTTP 409"]
        PERMISSION["PermissionDenied (7)"] --> H403["HTTP 403"]
        UNAUTH["Unauthenticated (16)"] --> H401["HTTP 401"]
        INTERNAL["Internal (13)"] --> H500["HTTP 500"]
        UNAVAILABLE["Unavailable (14)"] --> H503["HTTP 503"]
    end

性能优化

连接管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 使用连接池(gRPC内置HTTP/2多路复用)
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
// 设置连接参数
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second, // 发送ping的间隔
Timeout: 3 * time.Second, // 等待ping响应的超时
PermitWithoutStream: true, // 无活跃stream时也发送ping
}),
// 设置最大消息大小
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10*1024*1024), // 10MB
grpc.MaxCallSendMsgSize(10*1024*1024),
),
)

消息大小优化

1
2
3
4
5
6
7
// 使用更紧凑的字段类型
message CompactUser {
fixed32 id = 1; // 固定4字节,比int32高效(对大数值)
string name = 2;
bytes avatar = 3; // 二进制数据用bytes
sint32 score_delta = 4; // 有符号整数用sint32(ZigZag编码)
}

总结

Go gRPC微服务开发的核心要点:

  1. Protocol Buffers提供强类型的接口定义,是服务间的契约
  2. 四种RPC模式覆盖了不同的通信场景:一元调用、服务端流、客户端流、双向流
  3. 拦截器提供了横切关注点的统一处理(日志、认证、重试、限流)
  4. 错误处理使用标准的status/code体系,支持携带详细的错误信息
  5. 健康检查是服务治理的基础,配合负载均衡和服务发现使用
  6. 性能优化重点关注连接管理、消息大小和序列化效率

gRPC + Go的组合非常适合构建高性能的微服务系统。建议在生产环境中配合OpenTelemetry进行链路追踪,使用envoy或istio作为服务网格层,实现更完善的服务治理。

作者 · authorzt
发布 · date2023-09-20
篇幅 · length3.8k 字 · 9 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论