强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

13 - Connect 协议

第 13 章:Connect 协议

兼容 gRPC、拥抱 Web、curl 友好的新一代 RPC 协议


13.1 Connect 概述

Connect 是 Buf(Protocol Buffers 工具链公司)于 2022 年推出的 RPC 协议。它在保持与 gRPC 完全兼容的基础上,解决了 gRPC 在 Web 环境中的诸多痛点。

13.1.1 为什么需要 Connect

gRPC 的痛点:

1. 浏览器不原生支持
   - 需要 gRPC-Web 代理(Envoy/进程内代理)
   - 增加架构复杂度

2. curl 无法调用
   - 二进制协议,无法用 curl 测试
   - 需要专门的 grpcurl 工具

3. HTTP/1.1 不支持
   - 必须 HTTP/2
   - 某些代理/CDN 不完全支持 HTTP/2

4. 错误细节有限
   - 只有状态码 + 消息
   - 需要额外的 Details 机制

Connect 的解决方案:
✓ 原生支持浏览器(无需代理)
✓ curl 可以直接调用
✓ 支持 HTTP/1.1 和 HTTP/2
✓ 与 gRPC 完全互操作

13.1.2 Connect 协议模式

Connect 支持三种协议模式:

模式传输编码适用场景
Connect 协议HTTP/1.1 或 HTTP/2JSON 或 ProtobufWeb / 通用
gRPC 协议HTTP/2Protobuf与 gRPC 完全互操作
gRPC-Web 协议HTTP/1.1 或 HTTP/2ProtobufgRPC-Web 兼容

13.2 Connect 协议详解

13.2.1 请求格式(Unary)

Connect 协议的 Unary 请求:

POST /example.UserService/GetUser HTTP/1.1
Content-Type: application/json        ← JSON 编码(curl 友好!)
Connect-Protocol-Version: 1

{"id": 1}

--- 或者使用 Protobuf ---

POST /example.UserService/GetUser HTTP/1.1
Content-Type: application/proto
Connect-Protocol-Version: 1

<二进制 Protobuf 数据>

--- 响应格式 ---

HTTP/1.1 200 OK
Content-Type: application/json

{"user": {"id": 1, "name": "Alice"}}

--- 错误响应 ---

HTTP/1.1 404 Not Found
Content-Type: application/json

{"code": "not_found", "message": "用户 1 不存在"}

13.2.2 流式请求格式

Connect 协议的 Stream 请求:

POST /example.UserService/WatchUsers HTTP/1.1
Content-Type: application/connect+json   ← 注意前缀
Connect-Protocol-Version: 1
Trailer: Connect-Timeout-Ms

<流式消息序列:每个消息带 4 字节前缀长度>

消息格式(流式):
┌──────────────────┐
│ Flags (1 byte)   │  0x00 = 数据, 0x02 = 结束
│ Length (3 bytes)  │  消息长度(大端序)
│ Data (变长)       │  消息内容
└──────────────────┘

13.3 与 gRPC 的对比

特性gRPCConnect
浏览器原生支持❌ 需要代理✅ 原生支持
curl 调用❌ 二进制协议✅ JSON 可选
HTTP/1.1 支持
JSON 编码
流式 RPC
Protobuf 支持
拦截器
错误详情状态码 + Details状态码 + 结构化 JSON
代码生成protoc + 插件protoc + 插件(同一 .proto)
互操作-✅ 与 gRPC 服务器互操作
Go 实现grpc-goconnect-go
TypeScript 实现@grpc/grpc-js@connectrpc/connect

13.3.1 性能对比

测试环境:Go 1.21, 4核 CPU, 10,000 请求

┌──────────────────┬──────────┬──────────┬──────────┐
│ 指标             │ gRPC     │ Connect  │ 差异     │
├──────────────────┼──────────┼──────────┼──────────┤
│ Unary P50        │ 0.3ms    │ 0.32ms   │ +7%      │
│ Unary P99        │ 0.8ms    │ 0.85ms   │ +6%      │
│ 流式吞吐         │ 85k QPS  │ 82k QPS  │ -3.5%    │
│ Protobuf 大小    │ 320B     │ 320B     │ 相同     │
│ JSON 大小        │ N/A      │ 1024B    │ 3.2x     │
│ 内存占用         │ 基准     │ +5%      │ 略高     │
└──────────────────┴──────────┴──────────┴──────────┘

结论:Protobuf 模式下性能几乎相同,差异可忽略

13.4 Connect 实战

13.4.1 安装与项目设置

# 安装 buf 工具
# https://buf.build/docs/installation
go install github.com/bufbuild/buf/cmd/buf@latest

# 安装 connect-go
go get connectrpc.com/connect

# 安装 protoc-gen-connect-go
go install connectrpc.com/connect/cmd/protoc-gen-connect-go@latest
# buf.yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
# buf.gen.yaml
version: v1
plugins:
  - plugin: buf.build/protocolbuffers/go
    out: gen
    opt: paths=source_relative
  - plugin: buf.build/connectrpc/go
    out: gen
    opt: paths=source_relative

13.4.2 服务定义(Proto)

// user/v1/user.proto
syntax = "proto3";

package user.v1;

message User {
  int64 id = 1;
  string name = 2;
  string email = 3;
}

message GetUserRequest {
  int64 id = 1;
}

message GetUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  string next_page_token = 2;
}
// user/v1/user_service.proto
syntax = "proto3";

package user.v1;

import "user/v1/user.proto";

service UserService {
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc ListUsers(ListUsersRequest) returns (stream User);
}
# 生成代码
buf generate

13.4.3 服务端实现(Go)

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"

	userv1 "example/gen/user/v1"
	"example/gen/user/v1/userv1connect"

	"connectrpc.com/connect"
	"golang.org/x/net/http2"
	"golang.org/x/net/http2/h2c"
)

type userServer struct {
	userv1connect.UnimplementedUserServiceHandler
	users map[int64]*userv1.User
}

func newUserServer() *userServer {
	return &userServer{
		users: map[int64]*userv1.User{
			1: {Id: 1, Name: "Alice", Email: "[email protected]"},
			2: {Id: 2, Name: "Bob", Email: "[email protected]"},
		},
	}
}

func (s *userServer) GetUser(
	ctx context.Context,
	req *connect.Request[userv1.GetUserRequest],
) (*connect.Response[userv1.GetUserResponse], error) {
	user, ok := s.users[req.Msg.Id]
	if !ok {
		return nil, connect.NewError(connect.CodeNotFound,
			fmt.Errorf("用户 %d 不存在", req.Msg.Id))
	}

	resp := connect.NewResponse(&userv1.GetUserResponse{User: user})
	resp.Header().Set("X-Request-Id", "req-123")
	return resp, nil
}

func (s *userServer) ListUsers(
	ctx context.Context,
	req *connect.Request[userv1.ListUsersRequest],
	stream *connect.ServerStream[userv1.User],
) error {
	for _, user := range s.users {
		if err := stream.Send(user); err != nil {
			return err
		}
	}
	return nil
}

func main() {
	server := newUserServer()
	mux := http.NewServeMux()

	// 注册 Connect 服务
	path, handler := userv1connect.NewUserServiceHandler(server)
	mux.Handle(path, handler)

	// 同时支持 HTTP/2 和 HTTP/1.1(h2c 用于开发,生产环境用 TLS)
	addr := ":8080"
	log.Printf("Connect 服务器启动于 %s", addr)
	log.Printf("  Connect 协议: POST http://%s/user.v1.UserService/GetUser", addr)
	log.Printf("  gRPC 协议:    POST http://%s/user.v1.UserService/GetUser (gRPC headers)", addr)
	
	http.ListenAndServe(addr, h2c.NewHandler(mux, &http2.Server{}))
}

13.4.4 客户端实现(Go)

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"

	userv1 "example/gen/user/v1"
	"example/gen/user/v1/userv1connect"

	"connectrpc.com/connect"
)

func main() {
	// 创建 Connect 客户端
	client := userv1connect.NewUserServiceClient(
		http.DefaultClient,
		"http://localhost:8080",
	)

	// Unary 调用
	resp, err := client.GetUser(
		context.Background(),
		connect.NewRequest(&userv1.GetUserRequest{Id: 1}),
	)
	if err != nil {
		log.Printf("调用失败: %v", err)
		if connectErr, ok := err.(*connect.Error); ok {
			log.Printf("错误码: %s, 消息: %s", connectErr.Code(), connectErr.Message())
		}
		return
	}
	fmt.Printf("用户: %s (%s)\n", resp.Msg.User.Name, resp.Msg.User.Email)

	// 流式调用
	stream, err := client.ListUsers(
		context.Background(),
		connect.NewRequest(&userv1.ListUsersRequest{PageSize: 100}),
	)
	if err != nil {
		log.Fatalf("流式调用失败: %v", err)
	}
	for stream.Receive() {
		user := stream.Msg()
		fmt.Printf("  流式接收: %s\n", user.Name)
	}
	if err := stream.Err(); err != nil {
		log.Fatalf("流错误: %v", err)
	}
}

13.4.5 浏览器/TypeScript 客户端

// 使用 @connectrpc/connect-web
import { createClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";
import { UserService } from "./gen/user/v1/user_service_pb";

// 创建传输层(Connect 协议,支持 HTTP/1.1)
const transport = createConnectTransport({
  baseUrl: "http://localhost:8080",
});

// 创建客户端
const client = createClient(UserService, transport);

// 调用
async function getUser(id: bigint) {
  try {
    const response = await client.getUser({ id });
    console.log(`用户: ${response.user?.name}`);
  } catch (err) {
    console.error("调用失败:", err);
  }
}

// 流式调用(服务端流)
async function listUsers() {
  for await (const user of client.listUsers({ pageSize: 100 })) {
    console.log(`用户: ${user.name}`);
  }
}

13.4.6 curl 直接调用

# Connect 协议 + JSON 编码(curl 友好!)
curl \
  --header "Content-Type: application/json" \
  --data '{"id": 1}' \
  http://localhost:8080/user.v1.UserService/GetUser

# 输出:
# {"user": {"id": "1", "name": "Alice", "email": "[email protected]"}}

# 对比 gRPC(需要 grpcurl)
# grpcurl -plaintext -d '{"id": 1}' localhost:50051 user.v1.UserService/GetUser

13.5 错误处理

13.5.1 Connect 错误格式

{
  "code": "not_found",
  "message": "用户 123 不存在",
  "details": [
    {
      "type": "google.rpc.ErrorInfo",
      "value": "eyJkb21haW4iOiJ1c2VyIn0=",
      "debug": {
        "reason": "USER_NOT_FOUND",
        "domain": "user-service",
        "metadata": {"user_id": "123"}
      }
    }
  ]
}

13.5.2 错误码对照

Connect CodeHTTP 状态码gRPC Code
canceled408CANCELLED
unknown500UNKNOWN
invalid_argument400INVALID_ARGUMENT
deadline_exceeded504DEADLINE_EXCEEDED
not_found404NOT_FOUND
already_exists409ALREADY_EXISTS
permission_denied403PERMISSION_DENIED
resource_exhausted429RESOURCE_EXHAUSTED
failed_precondition400FAILED_PRECONDITION
aborted409ABORTED
out_of_range400OUT_OF_RANGE
unimplemented501UNIMPLEMENTED
internal500INTERNAL
unavailable503UNAVAILABLE
data_loss500DATA_LOSS
unauthenticated401UNAUTHENTICATED

13.6 拦截器

package main

import (
	"context"
	"log"
	"time"

	"connectrpc.com/connect"
)

// 日志拦截器
func loggingInterceptor() connect.UnaryInterceptorFunc {
	return func(next connect.UnaryFunc) connect.UnaryFunc {
		return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			start := time.Now()
			log.Printf("[Connect] %s %s", req.Spec().Procedure, "START")

			resp, err := next(ctx, req)

			log.Printf("[Connect] %s duration=%v err=%v",
				req.Spec().Procedure, time.Since(start), err)
			return resp, err
		}
	}
}

// 认证拦截器
func authInterceptor() connect.UnaryInterceptorFunc {
	return func(next connect.UnaryFunc) connect.UnaryFunc {
		return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			// 从 HTTP Header 获取 token
			token := req.Header().Get("Authorization")
			if token == "" {
				return nil, connect.NewError(connect.CodeUnauthenticated,
					fmt.Errorf("缺少认证信息"))
			}

			// 验证 token
			if err := validateToken(token); err != nil {
				return nil, connect.NewError(connect.CodeUnauthenticated, err)
			}

			return next(ctx, req)
		}
	}
}

// 使用拦截器
func main() {
	mux := http.NewServeMux()

	// 服务端拦截器
	path, handler := userv1connect.NewUserServiceHandler(
		newUserServer(),
		connect.WithInterceptors(loggingInterceptor(), authInterceptor()),
	)
	mux.Handle(path, handler)

	// 客户端拦截器
	client := userv1connect.NewUserServiceClient(
		http.DefaultClient,
		"http://localhost:8080",
		connect.WithInterceptors(loggingInterceptor()),
	)
	_ = client
}

13.7 何时选择 Connect

场景推荐方案理由
新项目,需要 Web 支持Connect原生浏览器支持
新项目,纯后端微服务gRPC 或 Connect性能相近,gRPC 生态更成熟
已有 gRPC 服务,需加 WebConnect兼容现有服务
需要 curl 调试ConnectJSON + HTTP/1.1
追求最大生态兼容gRPC社区最大

13.8 注意事项

⚠️ Connect 的局限

  • 相对较新(2022 年),生态不如 gRPC 成熟
  • 语言支持目前主要是 Go 和 TypeScript
  • 某些 gRPC 高级功能可能不完全支持

⚠️ 协议协商

  • 服务器需同时支持多种协议(Connect / gRPC / gRPC-Web)
  • 通过 Content-Type 和 Connect-Protocol-Version 头区分协议

💡 最佳实践

  • 新项目优先考虑 Connect
  • 使用 Buf 工具链管理 Protobuf
  • 前后端分离项目使用 Connect 的 JSON 模式
  • 微服务间使用 Connect 的 Protobuf 模式

13.9 扩展阅读


第 12 章 - Apache Thrift | 第 14 章 - 容器化部署