09 - gRPC 流式通信
第 09 章:gRPC 流式通信
当单次请求/响应不够用时,流式 RPC 就是答案
9.1 流式模式概述
gRPC 的流式模式(Streaming)允许客户端和服务器在一次 RPC 调用中发送或接收多个消息,充分利用 HTTP/2 的流式传输能力。
9.1.1 四种模式对比
模式 1:一元 RPC (Unary)
客户端: ──Request──→
服务器: ←──Response──
模式 2:服务端流 (Server Streaming)
客户端: ──Request──→
服务器: ←──Response 1──
←──Response 2──
←──Response 3──
←──(END)──
模式 3:客户端流 (Client Streaming)
客户端: ──Request 1──→
──Request 2──→
──Request 3──→
──(END)─────→
服务器: ←──Response──
模式 4:双向流 (Bidirectional Streaming)
客户端: ──Request 1──→
──Request 2──→ ←──Response 1──
──→ ←──Response 2──
──Request 3──→
──(END)─────→ ←──Response 3──
←──(END)──
9.1.2 选择指南
| 场景 | 推荐模式 | 理由 |
|---|---|---|
| 简单查询/更新 | 一元 RPC | 最简单直接 |
| 大数据集返回 | 服务端流 | 流式返回避免内存爆炸 |
| 批量上传 | 客户端流 | 流式发送避免单次请求过大 |
| 实时双向通信 | 双向流 | 低延迟双向数据交换 |
| 聊天/协作文档 | 双向流 | 双方都需要持续发送 |
| 日志/事件推送 | 服务端流 | 服务器持续推送 |
| 文件上传 | 客户端流 | 分块上传 |
9.2 服务端流(Server Streaming)
9.2.1 Proto 定义
// streaming.proto
syntax = "proto3";
package example;
service NotificationService {
// 服务端流:服务器持续推送通知
rpc Subscribe(SubscribeRequest) returns (stream Notification);
// 服务端流:流式返回列表数据
rpc ListUsers(ListUsersRequest) returns (stream UserRecord);
}
message SubscribeRequest {
string topic = 1;
int64 since_timestamp = 2;
}
message Notification {
string id = 1;
string title = 2;
string body = 3;
int64 timestamp = 4;
}
message ListUsersRequest {
string filter = 1;
int32 page_size = 2;
}
message UserRecord {
int64 id = 1;
string name = 2;
string email = 3;
}
9.2.2 服务端实现(Go)
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net"
"time"
pb "example/pb"
"google.golang.org/grpc"
)
type notificationService struct {
pb.UnimplementedNotificationServiceServer
}
// Subscribe 实现服务端流式 RPC
func (s *notificationService) Subscribe(
req *pb.SubscribeRequest,
stream pb.NotificationService_SubscribeServer,
) error {
log.Printf("客户端订阅主题: %s", req.Topic)
// 持续推送通知
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
count := 0
for {
select {
case <-stream.Context().Done():
log.Println("客户端断开连接")
return stream.Context().Err()
case <-ticker.C:
count++
notification := &pb.Notification{
Id: fmt.Sprintf("notif-%d", count),
Title: fmt.Sprintf("通知 #%d", count),
Body: fmt.Sprintf("这是第 %d 条通知", count),
Timestamp: time.Now().Unix(),
}
// 发送通知到客户端
if err := stream.Send(notification); err != nil {
log.Printf("发送失败: %v", err)
return err
}
log.Printf("已发送通知 #%d", count)
// 限制发送数量(演示用)
if count >= 10 {
return nil
}
}
}
}
// ListUsers 流式返回用户列表
func (s *notificationService) ListUsers(
req *pb.ListUsersRequest,
stream pb.NotificationService_ListUsersServer,
) error {
users := generateUsers(100) // 生成 100 个用户
for _, user := range users {
// 模拟逐条发送
if err := stream.Send(user); err != nil {
return err
}
time.Sleep(50 * time.Millisecond) // 模拟处理延迟
}
return nil
}
func generateUsers(count int) []*pb.UserRecord {
users := make([]*pb.UserRecord, count)
for i := 0; i < count; i++ {
users[i] = &pb.UserRecord{
Id: int64(i + 1),
Name: fmt.Sprintf("User_%d", i+1),
Email: fmt.Sprintf("user%[email protected]", i+1),
}
}
return users
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
server := grpc.NewServer()
pb.RegisterNotificationServiceServer(server, ¬ificationService{})
log.Println("gRPC 服务器启动于 :50051")
if err := server.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}
9.2.3 客户端实现(Go)
package main
import (
"context"
"io"
"log"
"time"
pb "example/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewNotificationServiceClient(conn)
// 示例 1:订阅通知流
subscribeNotifications(client)
// 示例 2:流式获取用户列表
listUsers(client)
}
func subscribeNotifications(client pb.NotificationServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.Subscribe(ctx, &pb.SubscribeRequest{
Topic: "orders",
})
if err != nil {
log.Fatalf("订阅失败: %v", err)
}
for {
notification, err := stream.Recv()
if err == io.EOF {
log.Println("通知流结束")
return
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
log.Printf("[通知] %s: %s", notification.Title, notification.Body)
}
}
func listUsers(client pb.NotificationServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
Filter: "active",
PageSize: 100,
})
if err != nil {
log.Fatalf("请求失败: %v", err)
}
var users []*pb.UserRecord
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
users = append(users, user)
}
log.Printf("共接收 %d 个用户", len(users))
}
9.3 客户端流(Client Streaming)
9.3.1 Proto 定义
// upload.proto
syntax = "proto3";
package example;
service FileService {
// 客户端流:文件上传
rpc Upload(stream UploadRequest) returns (UploadResponse);
// 客户端流:批量导入
rpc ImportUsers(stream ImportUserRequest) returns (ImportResponse);
}
message UploadRequest {
string filename = 1;
bytes chunk = 2;
int32 chunk_number = 3;
}
message UploadResponse {
string file_id = 1;
int64 total_bytes = 2;
string checksum = 3;
}
message ImportUserRequest {
string name = 1;
string email = 2;
string department = 3;
}
message ImportResponse {
int32 imported_count = 1;
int32 failed_count = 2;
repeated string errors = 3;
}
9.3.2 服务端实现(Go)
package main
import (
"crypto/md5"
"fmt"
"io"
"log"
pb "example/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type fileService struct {
pb.UnimplementedFileServiceServer
}
// Upload 实现客户端流式文件上传
func (s *fileService) Upload(stream pb.FileService_UploadServer) error {
var totalBytes int64
var filename string
var data []byte
hasher := md5.New()
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕,返回响应
checksum := fmt.Sprintf("%x", hasher.Sum(nil))
log.Printf("文件上传完成: %s, %d bytes, MD5: %s",
filename, totalBytes, checksum)
return stream.SendAndClose(&pb.UploadResponse{
FileId: fmt.Sprintf("file-%d", time.Now().Unix()),
TotalBytes: totalBytes,
Checksum: checksum,
})
}
if err != nil {
return status.Errorf(codes.Internal, "接收失败: %v", err)
}
// 首个 chunk 获取文件名
if filename == "" {
filename = req.Filename
log.Printf("开始接收文件: %s", filename)
}
// 累积数据
totalBytes += int64(len(req.Chunk))
data = append(data, req.Chunk...)
hasher.Write(req.Chunk)
log.Printf("接收 chunk #%d, 累计 %d bytes", req.ChunkNumber, totalBytes)
}
}
// ImportUsers 批量导入用户
func (s *fileService) ImportUsers(stream pb.FileService_ImportUsersServer) error {
var imported, failed int32
var errors []string
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.ImportResponse{
ImportedCount: imported,
FailedCount: failed,
Errors: errors,
})
}
if err != nil {
return err
}
// 处理每个用户
if err := importUser(req); err != nil {
failed++
errors = append(errors, err.Error())
} else {
imported++
}
}
}
func importUser(req *pb.ImportUserRequest) error {
if req.Name == "" {
return fmt.Errorf("用户名称为空")
}
// 模拟导入逻辑
return nil
}
func main() {
lis, _ := net.Listen("tcp", ":50051")
server := grpc.NewServer()
pb.RegisterFileServiceServer(server, &fileService{})
log.Println("服务器启动于 :50051")
server.Serve(lis)
}
9.3.3 客户端实现(Go)
package main
import (
"context"
"fmt"
"io"
"log"
"os"
pb "example/pb"
"google.golang.org/grpc"
)
func uploadFile(client pb.FileServiceClient, filePath string) error {
stream, err := client.Upload(context.Background())
if err != nil {
return fmt.Errorf("创建流失败: %w", err)
}
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("打开文件失败: %w", err)
}
defer file.Close()
// 分块发送
buf := make([]byte, 32*1024) // 32KB chunks
chunkNum := 0
for {
n, err := file.Read(buf)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("读取文件失败: %w", err)
}
chunkNum++
sendErr := stream.Send(&pb.UploadRequest{
Filename: filePath,
Chunk: buf[:n],
ChunkNumber: int32(chunkNum),
})
if sendErr != nil {
return fmt.Errorf("发送 chunk #%d 失败: %w", chunkNum, sendErr)
}
log.Printf("发送 chunk #%d, %d bytes", chunkNum, n)
}
// 关闭发送并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("接收响应失败: %w", err)
}
log.Printf("上传完成: file_id=%s, total=%d bytes, md5=%s",
resp.FileId, resp.TotalBytes, resp.Checksum)
return nil
}
func main() {
conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
defer conn.Close()
client := pb.NewFileServiceClient(conn)
if err := uploadFile(client, "/path/to/large-file.bin"); err != nil {
log.Fatalf("上传失败: %v", err)
}
}
9.4 双向流(Bidirectional Streaming)
9.4.1 Proto 定义
// chat.proto
syntax = "proto3";
package example;
service ChatService {
// 双向流:实时聊天
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
// 双向流:协同编辑
rpc Collaborate(stream EditOperation) returns (stream EditOperation);
}
message ChatMessage {
string user_id = 1;
string room_id = 2;
string content = 3;
int64 timestamp = 4;
MessageType type = 5;
enum MessageType {
TEXT = 0;
IMAGE = 1;
SYSTEM = 2;
TYPING = 3;
}
}
message EditOperation {
string document_id = 1;
string user_id = 2;
int32 position = 3;
string operation = 4; // "insert", "delete", "replace"
string content = 5;
int64 version = 6;
}
9.4.2 服务端实现(Go)
package main
import (
"io"
"log"
"sync"
"time"
pb "example/pb"
"google.golang.org/grpc"
)
type chatService struct {
pb.UnimplementedChatServiceServer
mu sync.RWMutex
rooms map[string][]chan *pb.ChatMessage
}
func newChatService() *chatService {
return &chatService{
rooms: make(map[string][]chan *pb.ChatMessage),
}
}
func (s *chatService) Chat(stream pb.ChatService_ChatServer) error {
// 用于广播消息的通道
msgChan := make(chan *pb.ChatMessage, 100)
var roomID string
// 启动接收 goroutine
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Println("客户端关闭发送")
close(msgChan)
return
}
if err != nil {
log.Printf("接收错误: %v", err)
close(msgChan)
return
}
roomID = msg.RoomId
msg.Timestamp = time.Now().UnixMilli()
// 注册到房间
s.joinRoom(roomID, msgChan)
// 广播消息给房间内其他成员
s.broadcast(roomID, msg, msgChan)
}
}()
// 发送消息给当前客户端
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
log.Printf("发送错误: %v", err)
s.leaveRoom(roomID, msgChan)
return err
}
}
return nil
}
func (s *chatService) joinRoom(roomID string, ch chan *pb.ChatMessage) {
s.mu.Lock()
defer s.mu.Unlock()
s.rooms[roomID] = append(s.rooms[roomID], ch)
}
func (s *chatService) leaveRoom(roomID string, ch chan *pb.ChatMessage) {
s.mu.Lock()
defer s.mu.Unlock()
members := s.rooms[roomID]
for i, member := range members {
if member == ch {
s.rooms[roomID] = append(members[:i], members[i+1:]...)
return
}
}
}
func (s *chatService) broadcast(roomID string, msg *pb.ChatMessage, sender chan *pb.ChatMessage) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, ch := range s.rooms[roomID] {
if ch != sender { // 不发给自己
select {
case ch <- msg:
default:
log.Println("消息队列满,丢弃消息")
}
}
}
}
func main() {
lis, _ := net.Listen("tcp", ":50051")
server := grpc.NewServer()
pb.RegisterChatServiceServer(server, newChatService())
log.Println("聊天服务器启动于 :50051")
server.Serve(lis)
}
9.4.3 客户端实现(Go)
package main
import (
"bufio"
"context"
"fmt"
"io"
"log"
"os"
"time"
pb "example/pb"
"google.golang.org/grpc"
)
func main() {
conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
defer conn.Close()
client := pb.NewChatServiceClient(conn)
stream, err := client.Chat(context.Background())
if err != nil {
log.Fatalf("创建聊天流失败: %v", err)
}
userID := fmt.Sprintf("user-%d", time.Now().UnixMilli())
roomID := "general"
// 启动接收 goroutine
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Println("聊天结束")
return
}
if err != nil {
log.Printf("接收错误: %v", err)
return
}
fmt.Printf("[%s] %s: %s\n",
time.UnixMilli(msg.Timestamp).Format("15:04:05"),
msg.UserId, msg.Content)
}
}()
// 从标准输入读取消息并发送
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("输入消息(输入 'quit' 退出):")
for scanner.Scan() {
text := scanner.Text()
if text == "quit" {
stream.CloseSend()
break
}
err := stream.Send(&pb.ChatMessage{
UserId: userID,
RoomId: roomID,
Content: text,
Type: pb.ChatMessage_TEXT,
})
if err != nil {
log.Printf("发送失败: %v", err)
}
}
}
9.5 错误处理
9.5.1 gRPC 状态码
| 状态码 | 名称 | 说明 |
|---|---|---|
| 0 | OK | 成功 |
| 1 | CANCELLED | 操作被取消 |
| 2 | UNKNOWN | 未知错误 |
| 3 | INVALID_ARGUMENT | 参数无效 |
| 4 | DEADLINE_EXCEEDED | 超时 |
| 5 | NOT_FOUND | 资源不存在 |
| 6 | ALREADY_EXISTS | 资源已存在 |
| 7 | PERMISSION_DENIED | 权限不足 |
| 8 | RESOURCE_EXHAUSTED | 资源耗尽 |
| 9 | FAILED_PRECONDITION | 前置条件不满足 |
| 10 | ABORTED | 操作中止 |
| 11 | OUT_OF_RANGE | 超出范围 |
| 12 | UNIMPLEMENTED | 未实现 |
| 13 | INTERNAL | 内部错误 |
| 14 | UNAVAILABLE | 服务不可用 |
| 15 | DATA_LOSS | 数据丢失 |
| 16 | UNAUTHENTICATED | 未认证 |
9.5.2 流式 RPC 的错误处理
// 服务端:返回结构化错误
func (s *chatService) Chat(stream pb.ChatService_ChatServer) error {
msg, err := stream.Recv()
if err != nil {
return err
}
// 参数校验
if msg.RoomId == "" {
return status.Error(codes.InvalidArgument, "room_id 不能为空")
}
// 权限检查
if !isAuthorized(msg.UserId, msg.RoomId) {
return status.Error(codes.PermissionDenied,
fmt.Sprintf("用户 %s 无权访问房间 %s", msg.UserId, msg.RoomId))
}
// 资源限制
if isRoomFull(msg.RoomId) {
st := status.New(codes.ResourceExhausted, "房间已满")
// 添加详细信息
ds, _ := st.WithDetails(&errdetails.QuotaFailure{
Violations: []*errdetails.QuotaFailure_Violation{
{Subject: msg.RoomId, Description: "最大成员数: 100"},
},
})
return ds.Err()
}
// 正常处理...
return nil
}
// 客户端:处理错误
func receiveMessages(stream pb.ChatService_ChatClient) {
for {
msg, err := stream.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.Canceled:
log.Println("聊天被取消")
case codes.Unavailable:
log.Println("服务不可用,尝试重连...")
time.Sleep(5 * time.Second)
// 重连逻辑...
case codes.PermissionDenied:
log.Printf("权限错误: %s", st.Message())
default:
log.Printf("错误 [%s]: %s", st.Code(), st.Message())
}
} else {
log.Printf("非 gRPC 错误: %v", err)
}
return
}
// 处理消息...
_ = msg
}
}
9.5.3 流超时与取消
// 设置流超时
func listUsersWithTimeout(client pb.NotificationServiceClient) error {
// 设置 10 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
Filter: "active",
})
if err != nil {
return err
}
for {
user, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
// 检查是否是超时
if ctx.Err() == context.DeadlineExceeded {
log.Println("列表获取超时")
return ctx.Err()
}
return err
}
processUser(user)
}
}
// 客户端主动取消
func cancelableStream(client pb.NotificationServiceClient) {
ctx, cancel := context.WithCancel(context.Background())
// 启动流
stream, _ := client.Subscribe(ctx, &pb.SubscribeRequest{Topic: "events"})
// 某些条件满足时取消
go func() {
time.Sleep(30 * time.Second)
log.Println("超时,取消订阅")
cancel()
}()
for {
notif, err := stream.Recv()
if err != nil {
log.Printf("流结束: %v", err)
return
}
processNotification(notif)
}
}
9.6 业务场景:实时日志收集系统
架构:
┌──────────┐ 客户端流 ┌──────────┐ 服务端流 ┌──────────┐
│ 应用服务 │ ──────────→ │ 日志网关 │ ──────────→ │ 分析引擎 │
│ (多实例) │ 批量发送 │ (聚合) │ 流式转发 │ (处理) │
└──────────┘ └──────────┘ └──────────┘
Proto 定义:
service LogCollector {
// 应用 → 网关:客户端流式上传日志
rpc UploadLogs(stream LogEntry) returns (UploadStatus);
// 网关 → 分析引擎:服务端流式推送
rpc StreamLogs(StreamRequest) returns (stream LogBatch);
}
9.7 注意事项
⚠️ 流的生命周期管理:
- 始终检查流的错误(
Recv()返回的io.EOF和其他错误) - 及时关闭流,避免资源泄漏
- 处理上下文取消(
stream.Context().Done())
⚠️ 并发安全:
Send()和Recv()可以并发调用- 但同方向的多个
Send()不能并发 - 需要自行实现互斥锁
⚠️ 消息大小限制:
- gRPC 默认最大消息大小为 4MB
- 流式传输时每个消息仍受此限制
- 使用
grpc.MaxRecvMsgSize()调整
💡 性能优化:
- 流式 RPC 比多次一元 RPC 更高效(复用连接、减少握手)
- 合理设置缓冲区大小
- 使用批量消息减少 RPC 调用次数