强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 08 章 - 反向代理与负载均衡

第 08 章 - 反向代理与负载均衡

8.1 反向代理基础

反向代理是 API 网关的核心能力,将客户端请求转发到后端服务。

# 基本反向代理
server {
    listen 8080;

    location /api/ {
        proxy_pass http://backend_service;

        # 传递客户端信息
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Request-ID $request_id;

        # 超时配置
        proxy_connect_timeout 5s;
        proxy_send_timeout 30s;
        proxy_read_timeout 30s;

        # 缓冲配置
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
    }
}

upstream backend_service {
    server 10.0.1.1:8080 weight=5;
    server 10.0.1.2:8080 weight=3;
    server 10.0.1.3:8080 backup;
}

8.2 负载均衡策略

策略对比

策略指令特点适用场景
轮询默认依次分配服务器性能相近
加权轮询weight按权重分配服务器性能不同
IP 哈希ip_hash同一 IP 固定后端会话保持
最少连接least_conn分配到连接最少的后端长连接场景
一致性哈希hash $request_uri相同请求固定后端缓存友好
随机random two least_conn随机选择大规模集群

8.2.1 加权轮询

upstream backend {
    server 10.0.1.1:8080 weight=5;  # 50% 流量
    server 10.0.1.2:8080 weight=3;  # 30% 流量
    server 10.0.1.3:8080 weight=2;  # 20% 流量
}

8.2.2 IP 哈希(会话保持)

upstream backend {
    ip_hash;
    server 10.0.1.1:8080;
    server 10.0.1.2:8080;
    server 10.0.1.3:8080;
}

8.2.3 一致性哈希

upstream backend {
    hash $request_uri consistent;  # 一致性哈希
    server 10.0.1.1:8080;
    server 10.0.1.2:8080;
    server 10.0.1.3:8080;
}

8.2.4 Lua 动态负载均衡

当 Nginx 原生策略不够灵活时,使用 Lua 实现自定义负载均衡。

-- /usr/local/openresty/lua/balancer.lua
local _M = {}

-- 加权随机选择
local function weighted_random(upstreams)
    local total_weight = 0
    for _, up in ipairs(upstreams) do
        total_weight = total_weight + up.weight
    end

    local rand = math.random() * total_weight
    local cumulative = 0

    for _, up in ipairs(upstreams) do
        cumulative = cumulative + up.weight
        if rand <= cumulative then
            return up
        end
    end

    return upstreams[#upstreams]
end

-- 最少连接选择(基于 Nginx 变量)
local function least_conn(upstreams)
    local min_conn = math.huge
    local selected = nil

    for _, up in ipairs(upstreams) do
        if up.healthy then
            -- 获取连接数(需要 stub_status 或自定义计数)
            local conn = ngx.shared.gateway_config:get("conn:" .. up.addr) or 0
            if conn < min_conn then
                min_conn = conn
                selected = up
            end
        end
    end

    return selected or upstreams[1]
end

-- 一致性哈希选择
local function consistent_hash(upstreams, key)
    local hash = 5381
    for i = 1, #key do
        hash = ((hash * 33) + string.byte(key, i)) % 2147483647
    end

    local index = (hash % #upstreams) + 1
    return upstreams[index]
end

-- 主选择函数
function _M.select_upstream(upstreams, strategy, key)
    strategy = strategy or "weighted_random"

    -- 过滤不健康的节点
    local healthy = {}
    for _, up in ipairs(upstreams) do
        if up.healthy ~= false then
            table.insert(healthy, up)
        end
    end

    if #healthy == 0 then
        return nil, "No healthy upstreams"
    end

    if strategy == "weighted_random" then
        return weighted_random(healthy)
    elseif strategy == "least_conn" then
        return least_conn(healthy)
    elseif strategy == "consistent_hash" then
        return consistent_hash(healthy, key or ngx.var.uri)
    else
        return healthy[math.random(#healthy)]
    end
end

return _M

8.3 健康检查

8.3.1 被动健康检查

Nginx 原生支持被动健康检查(max_fails + fail_timeout):

upstream backend {
    server 10.0.1.1:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.2:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.3:8080 max_fails=3 fail_timeout=30s;
}
参数说明
max_failsfail_timeout 期间允许的最大失败次数
fail_timeout标记为失败的时间窗口,也是标记为不可用的持续时间

8.3.2 主动健康检查

使用 ngx.timer 定期探测后端服务。

-- /usr/local/openresty/lua/health_check.lua
local _M = {}

local http = require "resty.http"
local cjson = require "cjson"

-- 后端配置
local backends = {
    {name = "user-service",  addr = "10.0.1.1:8080", path = "/health"},
    {name = "order-service", addr = "10.0.1.2:8080", path = "/health"},
    {name = "product-service", addr = "10.0.1.3:8080", path = "/health"},
}

-- 健康状态存储
local health_status = {}

-- 探测单个后端
local function probe(backend)
    local httpc = http.new()
    httpc:set_timeout(3000)

    local start_time = ngx.now()

    local res, err = httpc:request_uri(
        "http://" .. backend.addr .. backend.path,
        {method = "GET"}
    )

    local latency = (ngx.now() - start_time) * 1000  -- 毫秒

    if not res then
        return {
            healthy = false,
            error = err,
            latency = latency,
        }
    end

    return {
        healthy = (res.status >= 200 and res.status < 400),
        status = res.status,
        latency = latency,
    }
end

-- 定期健康检查
local function check_health(premature)
    if premature then return end

    for _, backend in ipairs(backends) do
        local result = probe(backend)
        local prev = health_status[backend.name]

        if result.healthy then
            -- 连续成功次数
            local success_count = (prev and prev.consecutive_success or 0) + 1
            health_status[backend.name] = {
                healthy = true,
                latency = result.latency,
                last_check = ngx.time(),
                consecutive_success = success_count,
                consecutive_failures = 0,
            }
        else
            -- 连续失败次数
            local fail_count = (prev and prev.consecutive_failures or 0) + 1
            local is_healthy = fail_count < 3  -- 连续失败 3 次才标记为不健康

            if prev and prev.healthy and not is_healthy then
                ngx.log(ngx.WARN, "Backend ", backend.name, " marked unhealthy after ",
                    fail_count, " consecutive failures")
            end

            health_status[backend.name] = {
                healthy = is_healthy,
                error = result.error,
                latency = result.latency,
                last_check = ngx.time(),
                consecutive_success = 0,
                consecutive_failures = fail_count,
            }
        end
    end
end

-- 启动健康检查定时器
function _M.start(interval)
    interval = interval or 10

    -- 使用 timer.every 创建周期性任务
    ngx.timer.every(interval, check_health)
    ngx.log(ngx.INFO, "Health check started, interval: ", interval, "s")
end

-- 获取健康状态
function _M.get_status()
    return health_status
end

-- 检查指定后端是否健康
function _M.is_healthy(name)
    local status = health_status[name]
    return status and status.healthy or true  -- 默认健康
end

return _M

8.4 重试机制

8.4.1 Nginx 原生重试

upstream backend {
    server 10.0.1.1:8080;
    server 10.0.1.2:8080;
    server 10.0.1.3:8080;
}

server {
    listen 8080;

    location /api/ {
        proxy_pass http://backend;

        # 重试配置
        proxy_next_upstream error timeout http_502 http_503 http_504;
        proxy_next_upstream_timeout 10s;   # 重试总超时
        proxy_next_upstream_tries 3;       # 最大重试次数

        # 幂等方法才重试(GET/HEAD/DELETE/PUT)
        # POST 请求默认不重试
    }
}

8.4.2 Lua 自定义重试

-- /usr/local/openresty/lua/retry.lua
local _M = {}

local http = require "resty.http"

-- 重试配置
local defaults = {
    max_retries = 3,
    retry_delay = 0.1,        -- 初始延迟 100ms
    max_delay = 2,            -- 最大延迟 2s
    backoff_multiplier = 2,   -- 指数退避倍数
    retry_on = {              -- 可重试的状态码
        [502] = true,
        [503] = true,
        [504] = true,
        [408] = true,
    },
}

-- 指数退避延迟
local function get_delay(retry_count, config)
    local delay = config.retry_delay * (config.backoff_multiplier ^ (retry_count - 1))
    return math.min(delay, config.max_delay)
end

-- 带重试的 HTTP 请求
function _M.request(url, opts, config)
    config = config or defaults
    local last_err

    for attempt = 1, config.max_retries do
        local httpc = http.new()
        httpc:set_timeout(opts.timeout or 5000)

        local res, err = httpc:request_uri(url, opts)

        if res then
            -- 检查是否需要重试
            if config.retry_on[res.status] then
                last_err = "HTTP " .. res.status
                -- 等待后重试
                if attempt < config.max_retries then
                    local delay = get_delay(attempt, config)
                    ngx.log(ngx.WARN, "Retry ", attempt, " after ", delay, "s, status: ", res.status)
                    ngx.sleep(delay)
                end
            else
                -- 成功或不需要重试的状态码
                return res, nil, attempt
            end
        else
            last_err = err
            if attempt < config.max_retries then
                local delay = get_delay(attempt, config)
                ngx.log(ngx.WARN, "Retry ", attempt, " after ", delay, "s, error: ", err)
                ngx.sleep(delay)
            end
        end
    end

    return nil, "All retries exhausted: " .. last_err, config.max_retries
end

return _M

8.5 超时控制

server {
    listen 8080;

    location /api/ {
        # 连接超时:与后端建立连接的超时
        proxy_connect_timeout 5s;

        # 发送超时:向后端发送请求的超时
        proxy_send_timeout 30s;

        # 读取超时:等待后端响应的超时
        proxy_read_timeout 30s;

        # 重试超时:重试的总时间限制
        proxy_next_upstream_timeout 10s;

        proxy_pass http://backend;
    }
}

Lua 超时控制

-- /usr/local/openresty/lua/timeout_handler.lua

local http = require "resty.http"

local function proxy_with_timeout(url, timeout)
    local httpc = http.new()
    httpc:set_timeout(timeout or 5000)

    -- 使用 cosocket 的异步连接
    local ok, err = httpc:connect(url)
    if not ok then
        return nil, "connect timeout: " .. err
    end

    -- 发送请求
    local res, err = httpc:request({method = "GET", path = "/"})
    if not res then
        return nil, "request timeout: " .. err
    end

    -- 读取响应体(带超时)
    local body, err = res:read_body()
    if not body then
        return nil, "read timeout: " .. err
    end

    -- 放回连接池
    httpc:set_keepalive(10000, 100)

    return {status = res.status, body = body}
end

8.6 熔断器(Circuit Breaker)

熔断器是微服务架构中的关键保护机制,当下游服务出现故障时快速失败,避免级联故障。

熔断器状态机

         请求成功率低
  ┌──────────────────────┐
  │                      ▼
┌─────┐  超过阈值   ┌─────────┐  超时/探测   ┌─────────┐
│CLOSED│ ──────────→ │  OPEN   │ ──────────→ │HALF-OPEN│
│(正常) │            │ (熔断)  │             │(半开)   │
└─────┘             └─────────┘             └─────────┘
  ▲                                              │
  │              探测成功                         │
  └──────────────────────────────────────────────┘
                     探测失败 → 回到 OPEN
-- /usr/local/openresty/lua/circuit_breaker.lua
local _M = {}

-- 熔断器状态
local STATE_CLOSED = "closed"       -- 正常状态
local STATE_OPEN = "open"           -- 熔断状态
local STATE_HALF_OPEN = "half_open" -- 半开状态

-- 默认配置
local default_config = {
    failure_threshold = 5,       -- 连续失败次数触发熔断
    success_threshold = 3,       -- 半开状态连续成功次数恢复
    timeout = 30,                -- 熔断持续时间(秒)
    window = 60,                 -- 统计窗口(秒)
    error_rate_threshold = 0.5,  -- 错误率阈值
    min_requests = 10,           -- 最少请求数(低于此数不计算错误率)
}

function _M.new(name, config)
    config = config or {}
    setmetatable(config, {__index = default_config})

    local shared = ngx.shared.gateway_config

    return {
        name = name,
        config = config,
        shared = shared,
    }
end

-- 获取当前状态
function _M:get_state()
    local state = self.shared:get("cb:" .. self.name .. ":state")
    return state or STATE_CLOSED
end

-- 记录请求结果
function _M:record(success)
    local prefix = "cb:" .. self.name
    local now = ngx.time()

    -- 递增计数
    if success then
        self.shared:incr(prefix .. ":success", 1, 0, self.config.window)
    else
        self.shared:incr(prefix .. ":failure", 1, 0, self.config.window)
    end

    self.shared:incr(prefix .. ":total", 1, 0, self.config.window)

    -- 检查状态转换
    local state = self:get_state()

    if state == STATE_CLOSED then
        self:_check_open()
    elseif state == STATE_HALF_OPEN then
        self:_check_half_open(success)
    end
end

-- 检查是否需要打开熔断器
function _M:_check_open()
    local prefix = "cb:" .. self.name

    -- 方式 1:连续失败次数
    local consecutive = self.shared:get(prefix .. ":consecutive_failures") or 0
    if consecutive >= self.config.failure_threshold then
        self:_open()
        return
    end

    -- 方式 2:错误率
    local total = self.shared:get(prefix .. ":total") or 0
    if total >= self.config.min_requests then
        local failures = self.shared:get(prefix .. ":failure") or 0
        local error_rate = failures / total
        if error_rate >= self.config.error_rate_threshold then
            self:_open()
        end
    end
end

-- 打开熔断器
function _M:_open()
    local prefix = "cb:" .. self.name
    self.shared:set(prefix .. ":state", STATE_OPEN, self.config.timeout)
    self.shared:set(prefix .. ":open_time", ngx.time())
    ngx.log(ngx.WARN, "Circuit breaker OPENED for: ", self.name)
end

-- 检查半开状态
function _M:_check_half_open(success)
    local prefix = "cb:" .. self.name

    if success then
        local consecutive = self.shared:incr(prefix .. ":half_open_success", 1, 0)
        if consecutive >= self.config.success_threshold then
            -- 恢复正常
            self.shared:set(prefix .. ":state", STATE_CLOSED)
            self.shared:set(prefix .. ":consecutive_failures", 0)
            self.shared:delete(prefix .. ":half_open_success")
            ngx.log(ngx.INFO, "Circuit breaker CLOSED for: ", self.name)
        end
    else
        -- 再次失败,重新打开
        self:_open()
        self.shared:delete(prefix .. ":half_open_success")
    end
end

-- 检查是否允许请求通过
function _M:allow_request()
    local state = self:get_state()

    if state == STATE_CLOSED then
        return true
    end

    if state == STATE_OPEN then
        -- 检查是否到了探测时间
        local open_time = self.shared:get("cb:" .. self.name .. ":open_time") or 0
        if ngx.time() - open_time >= self.config.timeout then
            -- 转换到半开状态
            self.shared:set("cb:" .. self.name .. ":state", STATE_HALF_OPEN)
            ngx.log(ngx.INFO, "Circuit breaker HALF-OPEN for: ", self.name)
            return true  -- 允许一个探测请求
        end
        return false  -- 熔断中,拒绝请求
    end

    if state == STATE_HALF_OPEN then
        return true  -- 半开状态允许探测请求
    end

    return true
end

-- 重置熔断器
function _M:reset()
    local prefix = "cb:" .. self.name
    self.shared:delete(prefix .. ":state")
    self.shared:delete(prefix .. ":consecutive_failures")
    self.shared:delete(prefix .. ":open_time")
    self.shared:delete(prefix .. ":half_open_success")
    self.shared:delete(prefix .. ":success")
    self.shared:delete(prefix .. ":failure")
    self.shared:delete(prefix .. ":total")
end

return _M

使用示例

lua_shared_dict circuit_breaker 10m;

server {
    listen 8080;

    location /api/users {
        access_by_lua_block {
            local cb = require "circuit_breaker"
            local breaker = cb.new("user-service", {
                failure_threshold = 5,
                timeout = 30,
            })

            if not breaker:allow_request() then
                ngx.status = 503
                ngx.header["Retry-After"] = "30"
                ngx.say('{"error":"Service temporarily unavailable","service":"user-service"}')
                return ngx.exit(503)
            end

            -- 存储 breaker 实例供后续使用
            ngx.ctx.breaker = breaker
        }

        proxy_pass http://user-service;

        header_filter_by_lua_block {
            local breaker = ngx.ctx.breaker
            if breaker then
                -- 根据响应状态记录结果
                local success = ngx.status < 500
                breaker:record(success)
            end
        }
    }
}

8.7 服务降级

当后端服务不可用时,返回降级响应。

-- /usr/local/openresty/lua/degrade.lua
local _M = {}

local cjson = require "cjson"

-- 降级策略配置
local degrade_rules = {
    ["/api/users"] = {
        strategy = "cache",  -- 使用缓存
        cache_key = "degrade:users",
        fallback = function()
            return {
                status = 200,
                body = cjson.encode({
                    data = {},
                    message = "Service degraded, showing cached data",
                }),
            }
        end,
    },
    ["/api/recommendations"] = {
        strategy = "default",  -- 返回默认值
        fallback = function()
            return {
                status = 200,
                body = cjson.encode({
                    data = {{"default_product_1"}, {"default_product_2"}},
                    message = "Showing default recommendations",
                }),
            }
        end,
    },
    ["/api/search"] = {
        strategy = "reject",  -- 直接拒绝
        fallback = function()
            return {
                status = 503,
                body = cjson.encode({
                    error = "Service Unavailable",
                    message = "Search service is temporarily unavailable",
                    retry_after = 60,
                }),
            }
        end,
    },
}

-- 执行降级
function _M.execute(uri)
    for pattern, rule in pairs(degrade_rules) do
        if uri:match("^" .. pattern) then
            local result = rule.fallback()

            ngx.status = result.status
            ngx.header["X-Degraded"] = "true"
            ngx.header["Content-Type"] = "application/json"
            ngx.say(result.body)
            return true
        end
    end
    return false
end

return _M

8.8 Lua 反向代理完整示例

-- /usr/local/openresty/lua/proxy_handler.lua
local cjson = require "cjson"
local http = require "resty.http"
local cb = require "circuit_breaker"
local degrade = require "degrade"

-- 后端服务配置
local services = {
    ["user-service"] = {
        upstreams = {
            {addr = "10.0.1.1:8080", weight = 5},
            {addr = "10.0.1.2:8080", weight = 3},
        },
    },
    ["order-service"] = {
        upstreams = {
            {addr = "10.0.2.1:8080", weight = 5},
            {addr = "10.0.2.2:8080", weight = 5},
        },
    },
}

-- 路由到服务
local route_map = {
    ["/api/users"] = "user-service",
    ["/api/orders"] = "order-service",
}

-- 选择后端
local function select_backend(service_name)
    local service = services[service_name]
    if not service then return nil end

    -- 加权随机
    local total = 0
    for _, up in ipairs(service.upstreams) do
        total = total + up.weight
    end

    local rand = math.random() * total
    local cumulative = 0
    for _, up in ipairs(service.upstreams) do
        cumulative = cumulative + up.weight
        if rand <= cumulative then
            return up.addr
        end
    end
    return service.upstreams[1].addr
end

-- 主处理函数
local function handle_request()
    local uri = ngx.var.uri
    local method = ngx.req.get_method()

    -- 路由匹配
    local service_name
    for pattern, name in pairs(route_map) do
        if uri:match("^" .. pattern) then
            service_name = name
            break
        end
    end

    if not service_name then
        ngx.status = 404
        ngx.say(cjson.encode({error = "Service not found"}))
        return
    end

    -- 熔断检查
    local breaker = cb.new(service_name)
    if not breaker:allow_request() then
        -- 尝试降级
        if degrade.execute(uri) then
            return
        end
        ngx.status = 503
        ngx.say(cjson.encode({error = "Service circuit breaker open"}))
        return
    end

    -- 选择后端
    local backend = select_backend(service_name)
    if not backend then
        ngx.status = 503
        ngx.say(cjson.encode({error = "No available backend"}))
        return
    end

    -- 读取请求体
    ngx.req.read_body()
    local body = ngx.req.get_body_data()

    -- 转发请求
    local httpc = http.new()
    httpc:set_timeout(5000)

    local upstream_url = "http://" .. backend .. uri

    local res, err = httpc:request_uri(upstream_url, {
        method = method,
        body = body,
        headers = {
            ["Content-Type"] = ngx.var.content_type,
            ["X-Request-ID"] = ngx.var.request_id or "",
            ["X-User-ID"] = ngx.var.user_id or "",
            ["X-Real-IP"] = ngx.var.remote_addr,
        },
    })

    -- 记录熔断器状态
    if res then
        breaker:record(res.status < 500)
    else
        breaker:record(false)
    end

    if not res then
        -- 尝试降级
        if degrade.execute(uri) then
            return
        end
        ngx.status = 502
        ngx.say(cjson.encode({error = "Bad Gateway", message = err}))
        return
    end

    -- 返回响应
    ngx.status = res.status
    for k, v in pairs(res.headers) do
        if k ~= "transfer-encoding" then
            ngx.header[k] = v
        end
    end
    ngx.header["X-Upstream"] = backend
    ngx.say(res.body)
end

-- 执行
local ok, err = pcall(handle_request)
if not ok then
    ngx.log(ngx.ERR, "Proxy handler error: ", err)
    ngx.status = 500
    ngx.say(cjson.encode({error = "Internal Server Error"}))
end

8.9 注意事项

重试幂等性:只有幂等方法(GET、HEAD、PUT、DELETE)才应该重试。POST 请求重试可能导致重复操作。

连接池管理:使用 set_keepalive 复用连接,避免频繁建立 TCP 连接。但注意连接池大小限制。

超时层级proxy_connect_timeout < proxy_read_timeout < proxy_next_upstream_timeout,确保层级合理。

熔断恢复:熔断器的 timeout 不宜过短(避免频繁探测),也不宜过长(避免服务长时间不可用)。建议 30-60 秒。


上一章← 第 07 章 - 认证与鉴权 下一章第 09 章 - 缓存策略 →