第 13 章 - 微服务网关架构
第 13 章 - 微服务网关架构
13.1 微服务网关全景
┌──────────────────────────────────┐
│ API Gateway │
│ ┌────────┬────────┬──────────┐ │
│ │ 路由 │ 认证 │ 限流 │ │
│ │ 服务发现│ 配置中心│ 链路追踪 │ │
│ └────────┴────────┴──────────┘ │
└──────────────┬───────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌───────▼───────┐ ┌────────▼────────┐ ┌───────▼───────┐
│ 用户服务 │ │ 订单服务 │ │ 商品服务 │
│ (user-svc) │ │ (order-svc) │ │ (product-svc) │
└───────────────┘ └─────────────────┘ └───────────────┘
│ │ │
┌───────▼───────┐ ┌────────▼────────┐ ┌───────▼───────┐
│ Consul/Nacos │ │ Jaeger │ │ Prometheus │
│ 服务发现 │ │ 链路追踪 │ │ 监控告警 │
└───────────────┘ └─────────────────┘ └───────────────┘
13.2 服务发现
13.2.1 Consul 集成
Consul 是 HashiCorp 开发的服务发现和配置管理工具。
-- /usr/local/openresty/lua/discovery/consul.lua
local _M = {}
local http = require "resty.http"
local cjson = require "cjson"
local config = {
consul_addr = os.getenv("CONSUL_ADDR") or "http://127.0.0.1:8500",
cache_ttl = 10, -- 服务缓存 TTL(秒)
watch_interval = 5, -- 健康检查间隔
}
-- 服务实例缓存
local service_cache = ngx.shared.service_cache
-- 从 Consul 获取健康服务实例
function _M.get_service(service_name, tag)
-- 检查缓存
local cache_key = "svc:" .. service_name .. ":" .. (tag or "")
local cached = service_cache:get(cache_key)
if cached then
return cjson.decode(cached)
end
-- 请求 Consul API
local url = config.consul_addr .. "/v1/health/service/" .. service_name
.. "?passing=true" -- 只返回健康实例
if tag then
url = url .. "&tag=" .. tag
end
local httpc = http.new()
httpc:set_timeout(3000)
local res, err = httpc:request_uri(url, {method = "GET"})
if not res then
ngx.log(ngx.ERR, "Consul request failed: ", err)
-- 降级:返回缓存(即使是过期的)
local stale = service_cache:get("stale:" .. cache_key)
if stale then
return cjson.decode(stale)
end
return nil, err
end
local entries = cjson.decode(res.body)
local instances = {}
for _, entry in ipairs(entries) do
local svc = entry.Service
table.insert(instances, {
id = svc.ID,
address = svc.Address or entry.Node.Address,
port = svc.Port,
tags = svc.Tags or {},
meta = svc.Meta or {},
-- 健康检查信息
checks = entry.Checks,
})
end
-- 更新缓存
service_cache:set(cache_key, cjson.encode(instances), config.cache_ttl)
-- 保存过期缓存(降级使用)
service_cache:set("stale:" .. cache_key, cjson.encode(instances), config.cache_ttl * 10)
return instances
end
-- 服务选择(负载均衡)
function _M.select_instance(service_name, strategy)
local instances, err = _M.get_service(service_name)
if not instances or #instances == 0 then
return nil, "No available instances for " .. service_name
end
strategy = strategy or "round_robin"
if strategy == "round_robin" then
-- 简单轮询
local counter = service_cache:incr("counter:" .. service_name, 1, 0)
local index = (counter % #instances) + 1
return instances[index]
elseif strategy == "random" then
return instances[math.random(#instances)]
elseif strategy == "consistent_hash" then
local key = ngx.var.http_x_user_id or ngx.var.remote_addr
local hash = 5381
for i = 1, #key do
hash = ((hash * 33) + string.byte(key, i)) % 2147483647
end
local index = (hash % #instances) + 1
return instances[index]
end
return instances[1]
end
-- 注册本服务到 Consul
function _M.register(name, id, address, port, health_check)
local body = cjson.encode({
ID = id,
Name = name,
Address = address,
Port = port,
Check = health_check or {
HTTP = "http://" .. address .. ":" .. port .. "/health",
Interval = "10s",
Timeout = "3s",
},
})
local httpc = http.new()
httpc:set_timeout(3000)
local res, err = httpc:request_uri(
config.consul_addr .. "/v1/agent/service/register",
{
method = "PUT",
body = body,
headers = {["Content-Type"] = "application/json"},
}
)
return res and res.status == 200, err
end
-- 服务发现定时刷新(后台任务)
function _M.start_watcher(services)
local function refresh(premature)
if premature then return end
for _, svc_name in ipairs(services) do
_M.get_service(svc_name)
end
end
ngx.timer.every(config.watch_interval, refresh)
end
return _M
nginx 配置
lua_shared_dict service_cache 20m;
init_worker_by_lua_block {
local discovery = require "discovery.consul"
-- 启动服务发现定时刷新
discovery.start_watcher({
"user-service",
"order-service",
"product-service",
})
}
server {
listen 8080;
location /api/users {
access_by_lua_block {
local discovery = require "discovery.consul"
local instance = discovery.select_instance("user-service")
if instance then
ngx.var.backend_addr = instance.address .. ":" .. instance.port
else
ngx.status = 503
ngx.say('{"error":"Service unavailable"}')
return ngx.exit(503)
end
}
proxy_pass http://$backend_addr;
}
}
13.3 Nacos 集成
Nacos 是阿里巴巴开源的服务发现和配置管理平台,在国内广泛使用。
-- /usr/local/openresty/lua/discovery/nacos.lua
local _M = {}
local http = require "resty.http"
local cjson = require "cjson"
local config = {
nacos_addr = os.getenv("NACOS_ADDR") or "http://127.0.0.1:8848",
namespace = os.getenv("NACOS_NAMESPACE") or "public",
group = "DEFAULT_GROUP",
}
function _M.get_instances(service_name)
local url = config.nacos_addr
.. "/nacos/v1/ns/instance/list"
.. "?serviceName=" .. service_name
.. "&namespaceId=" .. config.namespace
.. "&groupName=" .. config.group
.. "&healthyOnly=true"
local httpc = http.new()
httpc:set_timeout(3000)
local res, err = httpc:request_uri(url, {method = "GET"})
if not res then
return nil, err
end
local data = cjson.decode(res.body)
local instances = {}
for _, host in ipairs(data.hosts or {}) do
table.insert(instances, {
address = host.ip,
port = host.port,
healthy = host.healthy,
weight = host.weight or 100,
metadata = host.metadata or {},
})
end
return instances
end
-- Nacos 配置获取
function _M.get_config(data_id, group)
local url = config.nacos_addr
.. "/nacos/v1/cs/configs"
.. "?dataId=" .. data_id
.. "&group=" .. (group or config.group)
.. "&tenant=" .. config.namespace
local httpc = http.new()
httpc:set_timeout(3000)
local res, err = httpc:request_uri(url, {method = "GET"})
if not res or res.status ~= 200 then
return nil, err or "Config not found"
end
return res.body
end
-- 配置监听(长轮询)
function _M.watch_config(data_id, group, callback)
local function poll(premature)
if premature then return end
local config_content = _M.get_config(data_id, group)
if config_content then
callback(config_content)
end
-- 继续轮询
ngx.timer.at(5, poll)
end
ngx.timer.at(0, poll)
end
return _M
13.4 配置中心集成
-- /usr/local/openresty/lua/config_center.lua
local _M = {}
local cjson = require "cjson"
-- 配置源优先级:环境变量 > 配置中心 > 本地文件
local config_sources = {
env = function(key)
return os.getenv("GW_" .. key:upper():gsub("%.", "_"))
end,
shared = function(key)
return ngx.shared.gateway_config:get("config:" .. key)
end,
consul = function(key)
local consul = require "discovery.consul"
-- Consul KV 存储
local httpc = require "resty.http"
local http = httpc.new()
local res, err = http:request_uri(
"http://127.0.0.1:8500/v1/kv/gateway/" .. key .. "?raw"
)
if res and res.status == 200 then
return res.body
end
return nil
end,
file = function(key)
local f = io.open("/etc/openresty/config.json", "r")
if f then
local content = f:read("*all")
f:close()
local config = cjson.decode(content)
-- 支持点号分隔的嵌套 key
local value = config
for k in key:gmatch("[^%.]+") do
if type(value) == "table" then
value = value[k]
else
return nil
end
end
return type(value) == "table" and cjson.encode(value) or tostring(value)
end
return nil
end,
}
-- 获取配置值
function _M.get(key, default)
for source_name, source_func in pairs(config_sources) do
local ok, value = pcall(source_func, key)
if ok and value ~= nil then
return value
end
end
return default
end
-- 获取配置(JSON 解析)
function _M.get_json(key, default)
local value = _M.get(key)
if value then
local ok, decoded = pcall(cjson.decode, value)
if ok then return decoded end
end
return default
end
-- 设置配置(写入共享内存)
function _M.set(key, value)
ngx.shared.gateway_config:set("config:" .. key, tostring(value))
end
-- 热加载配置
function _M.reload()
local consul = require "discovery.consul"
local configs = {
"rate_limit",
"jwt_secret",
"upstream_services",
}
for _, key in ipairs(configs) do
local value = _M.get(key)
if value then
_M.set(key, value)
ngx.log(ngx.INFO, "Config reloaded: ", key)
end
end
end
return _M
13.5 链路追踪
13.5.1 分布式追踪原理
客户端请求
│
▼
┌─────────┐ Trace ID: abc123
│ 网关 │ Span ID: 001
│ │ Duration: 150ms
└────┬─────┘
│
├──────────────┐
▼ ▼
┌─────────┐ ┌─────────┐
│ 用户服务 │ │ 订单服务 │ Span ID: 002 Span ID: 003
│ │ │ │ Parent: 001 Parent: 001
└────┬─────┘ └────┬─────┘ Duration: 50ms Duration: 80ms
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ 数据库 │ │ 数据库 │ Span ID: 004 Span ID: 005
│ │ │ │ Parent: 002 Parent: 003
└─────────┘ └─────────┘ Duration: 20ms Duration: 30ms
13.5.2 OpenResty 链路追踪实现
-- /usr/local/openresty/lua/tracing/tracer.lua
local _M = {}
local cjson = require "cjson"
-- 生成唯一 ID
local function generate_id()
-- 16 字节随机 ID(32 位十六进制)
return string.format("%08x%08x",
math.random(0, 0xFFFFFFFF),
math.random(0, 0xFFFFFFFF))
end
-- 从请求头提取追踪上下文
local function extract_context()
local headers = ngx.req.get_headers()
-- W3C Trace Context 标准
local traceparent = headers["traceparent"]
if traceparent then
local version, trace_id, span_id, flags =
traceparent:match("^(%x%x)%-(%x+)%-(%x+)%-(%x%x)$")
if trace_id then
return {
trace_id = trace_id,
parent_span_id = span_id,
flags = flags,
}
end
end
-- Jaeger 格式
local uber_trace = headers["uber-trace-id"]
if uber_trace then
local trace_id, span_id, _, flags = uber_trace:match("^(%x+):(%x+):(%x+):(%x+)$")
if trace_id then
return {
trace_id = trace_id,
parent_span_id = span_id,
flags = flags,
}
end
end
return nil
end
-- 初始化追踪上下文
function _M.start_span(operation_name)
local ctx = extract_context()
local span = {
trace_id = ctx and ctx.trace_id or generate_id(),
span_id = generate_id(),
parent_span_id = ctx and ctx.parent_span_id or nil,
operation_name = operation_name,
start_time = ngx.now() * 1000000, -- 微秒
tags = {},
logs = {},
}
-- 存储到请求上下文
ngx.ctx.trace_span = span
-- 设置请求头传递给下游
local traceparent = string.format("00-%s-%s-%s",
span.trace_id, span.span_id, "01")
ngx.req.set_header("traceparent", traceparent)
ngx.req.set_header("X-Trace-ID", span.trace_id)
ngx.req.set_header("X-Span-ID", span.span_id)
return span
end
-- 结束 Span
function _M.finish_span()
local span = ngx.ctx.trace_span
if not span then return end
span.end_time = ngx.now() * 1000000
span.duration = span.end_time - span.start_time
-- 记录额外信息
span.tags["http.method"] = ngx.req.get_method()
span.tags["http.url"] = ngx.var.uri
span.tags["http.status_code"] = ngx.status
span.tags["http.client_ip"] = ngx.var.remote_addr
-- 异步发送到追踪后端
_M.report_span(span)
end
-- 添加标签
function _M.set_tag(key, value)
local span = ngx.ctx.trace_span
if span then
span.tags[key] = value
end
end
-- 添加日志
function _M.log(key, value)
local span = ngx.ctx.trace_span
if span then
table.insert(span.logs, {
timestamp = ngx.now() * 1000000,
key = key,
value = value,
})
end
end
-- 发送 Span 到 Jaeger
function _M.report_span(span)
-- 使用 UDP 发送(Jaeger Agent 格式)
local sock = ngx.socket.udp()
local ok, err = sock:setpeername("jaeger-agent", 6831)
if not ok then
ngx.log(ngx.ERR, "Failed to connect to Jaeger agent: ", err)
return
end
-- 构建 Jaeger Thrift 格式(简化版)
local data = cjson.encode({
traceId = span.trace_id,
spanId = span.span_id,
parentSpanId = span.parent_span_id,
operationName = span.operation_name,
startTime = span.start_time,
duration = span.duration,
tags = span.tags,
logs = span.logs,
})
sock:send(data)
sock:close()
end
return _M
nginx 配置
server {
listen 8080;
# 链路追踪中间件
rewrite_by_lua_block {
local tracer = require "tracing.tracer"
tracer.start_span("gateway_request")
}
header_filter_by_lua_block {
local tracer = require "tracing.tracer"
-- 将 Trace ID 返回给客户端
local span = ngx.ctx.trace_span
if span then
ngx.header["X-Trace-ID"] = span.trace_id
end
}
log_by_lua_block {
local tracer = require "tracing.tracer"
tracer.finish_span()
}
location /api/ {
proxy_pass http://backend;
# 追踪头已经通过 rewrite 阶段设置了
}
}
13.6 可观测性三支柱
13.6.1 指标(Metrics)
-- Prometheus 指标(第 11 章已介绍)
-- 扩展微服务维度指标
local function service_metrics()
local metrics = require "metrics"
-- 按服务名统计
metrics.counter_inc("gateway_service_requests_total", {
service = ngx.var.upstream_service or "unknown",
method = ngx.req.get_method(),
status = tostring(ngx.status),
})
-- 按服务名统计延迟
metrics.histogram_observe("gateway_service_duration_seconds",
tonumber(ngx.var.upstream_response_time) or 0, {
service = ngx.var.upstream_service or "unknown",
})
end
13.6.2 日志(Logging)
-- 结构化日志(微服务关联)
local function service_log()
local cjson = require "cjson"
local log_entry = {
timestamp = ngx.now(),
trace_id = ngx.ctx.trace_span and ngx.ctx.trace_span.trace_id or "",
span_id = ngx.ctx.trace_span and ngx.ctx.trace_span.span_id or "",
service = ngx.var.upstream_service or "",
method = ngx.req.get_method(),
uri = ngx.var.uri,
status = ngx.status,
latency = ngx.var.request_time,
}
-- 发送到 ELK 或 Loki
end
13.6.3 追踪(Tracing)
已在 13.5 节详细介绍。
13.7 注意事项
服务缓存一致性:服务发现的缓存 TTL 不宜过长(建议 10-30 秒),避免路由到已下线的实例。
Consul/Nacos 故障:当配置中心不可用时,网关应能使用缓存的服务列表降级运行。
链路采样:高 QPS 场景下全量追踪会产生大量数据,建议使用采样策略(如 10% 请求采样)。
上下文传播:确保追踪上下文(Trace ID、Span ID)在所有异步调用中正确传播。