强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OpenResty 高性能网关开发教程 / 第 06 章 - 限流与流控

第 06 章 - 限流与流控

6.1 为什么需要限流?

场景不限流的后果限流策略
突发流量后端服务崩溃令牌桶允许突发
恶意攻击DDoS/CC 攻击IP 级别限流
API 滥用资源耗尽用户级别限流
依赖故障级联失败熔断 + 限流

6.2 限流算法对比

算法特点突发处理实现复杂度精确度
固定窗口最简单
滑动窗口平滑
漏桶恒定速率
令牌桶允许突发

6.3 固定窗口限流

固定窗口将时间划分为固定大小的窗口,每个窗口内限制请求数量。

-- /usr/local/openresty/lua/limiters/fixed_window.lua
local _M = {}

function _M.new(shared_dict_name, max_requests, window_seconds)
    local shared = ngx.shared[shared_dict_name]
    if not shared then
        return nil, "shared dict not found: " .. shared_dict_name
    end

    return {
        shared = shared,
        max_requests = max_requests,
        window = window_seconds,
    }
end

function _M:incoming(key, commit)
    local now = ngx.time()
    local window_key = key .. ":" .. math.floor(now / self.window)

    local count, err = self.shared:get(window_key)
    if not count then
        count = 0
    end

    if count >= self.max_requests then
        return false, "rejected"
    end

    if commit then
        local new_count, err = self.shared:incr(window_key, 1, 0, self.window)
        if not new_count then
            return false, err
        end
    end

    return true, "ok"
end

return _M

固定窗口的问题

窗口边界问题:用户在窗口边界发送请求,可能在短时间内发送 2 倍限额

窗口1 (0-60s)     窗口2 (60-120s)
|████████████|████████████|
         ↑ 58s: 发送 100 个请求
              ↑ 62s: 发送 100 个请求
         2秒内发送了 200 个请求!

6.4 滑动窗口限流

滑动窗口通过记录每个请求的时间戳来解决固定窗口的边界问题。

-- /usr/local/openresty/lua/limiters/sliding_window.lua
local _M = {}

function _M.new(shared_dict_name, max_requests, window_seconds)
    return {
        shared = ngx.shared[shared_dict_name],
        max_requests = max_requests,
        window = window_seconds,
    }
end

-- 基于 Redis 的滑动窗口(分布式场景)
function _M:incoming_redis(key, commit)
    local red = require "resty.redis"
    local redis = red:new()
    redis:set_timeout(1000)

    local ok, err = redis:connect("127.0.0.1", 6379)
    if not ok then
        return nil, "redis connect failed: " .. err
    end

    local now = ngx.now() * 1000  -- 毫秒
    local window_start = now - (self.window * 1000)

    -- 使用 Redis 有序集合实现滑动窗口
    local redis_key = "rate_limit:" .. key

    -- Lua 脚本保证原子性
    local script = [[
        local key = KEYS[1]
        local now = tonumber(ARGV[1])
        local window_start = tonumber(ARGV[2])
        local max_requests = tonumber(ARGV[3])
        local window_ms = tonumber(ARGV[4])

        -- 移除过期的请求记录
        redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)

        -- 获取当前窗口内的请求数
        local count = redis.call('ZCARD', key)

        if count < max_requests then
            -- 允许请求,记录时间戳
            redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
            redis.call('EXPIRE', key, math.ceil(window_ms / 1000))
            return {1, max_requests - count - 1}
        else
            -- 拒绝请求
            return {0, 0}
        end
    ]]

    local res, err = redis:eval(script, 1, redis_key, now, window_start, self.max_requests, self.window * 1000)

    -- 放回连接池
    redis:set_keepalive(10000, 100)

    if not res then
        return nil, "redis eval failed: " .. err
    end

    if res[1] == 1 then
        return true, "ok", res[2]
    else
        return false, "rejected", 0
    end
end

return _M

6.5 漏桶算法(Leaky Bucket)

漏桶以恒定速率处理请求,多余的请求被排队或丢弃。

-- /usr/local/openresty/lua/limiters/leaky_bucket.lua
local _M = {}

function _M.new(shared_dict_name, rate, capacity)
    return {
        shared = ngx.shared[shared_dict_name],
        rate = rate,           -- 每秒处理请求数
        capacity = capacity,   -- 桶容量
    }
end

function _M:incoming(key, commit)
    local now = ngx.now()  -- 高精度时间

    -- 获取桶状态
    local bucket_key = key .. ":bucket"
    local state = self.shared:get(bucket_key)

    local water_level    -- 当前水位
    local last_leak_time -- 上次漏水时间

    if state then
        water_level, last_leak_time = state:match("([^,]+),([^,]+)")
        water_level = tonumber(water_level)
        last_leak_time = tonumber(last_leak_time)
    else
        water_level = 0
        last_leak_time = now
    end

    -- 计算漏水量
    local elapsed = now - last_leak_time
    local leaked = elapsed * self.rate
    water_level = math.max(0, water_level - leaked)

    -- 判断是否溢出
    if water_level >= self.capacity then
        return false, "rejected", 0
    end

    -- 添加水
    if commit then
        water_level = water_level + 1
        self.shared:set(bucket_key, water_level .. "," .. now, 60)
    end

    -- 计算建议等待时间
    local wait_time = 0
    if water_level > self.capacity * 0.8 then
        wait_time = (water_level - self.capacity * 0.8) / self.rate
    end

    return true, "ok", wait_time
end

return _M

漏桶特性

请求进入    漏桶          恒定速率输出
 ─→ │  ■■■■■  │ ─→ ■ → 处理
    │  ■■■■   │    ■ → 处理
    │  ■■■    │    ■ → 处理
    └─────────┘
    多余请求被丢弃或排队

6.6 令牌桶算法(Token Bucket)

令牌桶以恒定速率添加令牌,请求需要消耗令牌。允许一定程度的突发。

-- /usr/local/openresty/lua/limiters/token_bucket.lua
local _M = {}

function _M.new(shared_dict_name, rate, capacity)
    return {
        shared = ngx.shared[shared_dict_name],
        rate = rate,           -- 每秒产生的令牌数
        capacity = capacity,   -- 桶容量
    }
end

function _M:incoming(key, commit, tokens)
    tokens = tokens or 1
    local now = ngx.now()

    -- 获取桶状态
    local bucket_key = key .. ":token_bucket"
    local data = self.shared:get(bucket_key)

    local available_tokens  -- 可用令牌数
    local last_refill_time  -- 上次补充时间

    if data then
        available_tokens, last_refill_time = data:match("([^,]+),([^,]+)")
        available_tokens = tonumber(available_tokens)
        last_refill_time = tonumber(last_refill_time)
    else
        available_tokens = self.capacity  -- 初始满桶
        last_refill_time = now
    end

    -- 补充令牌
    local elapsed = now - last_refill_time
    local new_tokens = elapsed * self.rate
    available_tokens = math.min(self.capacity, available_tokens + new_tokens)

    -- 检查令牌是否足够
    if available_tokens < tokens then
        -- 计算需要等待的时间
        local wait_time = (tokens - available_tokens) / self.rate
        return false, "rejected", wait_time
    end

    -- 消耗令牌
    if commit then
        available_tokens = available_tokens - tokens
        self.shared:set(bucket_key, available_tokens .. "," .. now, 120)
    end

    return true, "ok", 0
end

return _M

令牌桶 vs 漏桶

令牌桶:
  - 允许突发流量(桶中有足够的令牌时)
  - 长期平均速率 = 令牌生成速率
  - 适合:API 限流、用户配额

漏桶:
  - 恒定速率输出,不允许突发
  - 输出速率固定
  - 适合:流量整形、消息队列

6.7 基于 Redis 的分布式限流

在多节点部署场景下,需要使用 Redis 实现分布式限流。

-- /usr/local/openresty/lua/limiters/distributed.lua
local _M = {}

local cjson = require "cjson"

-- Redis 连接池
local function get_redis()
    local redis = require "resty.redis"
    local red = redis:new()
    red:set_timeout(1000)
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        return nil, err
    end
    return red
end

-- 滑动窗口限流(Redis 有序集合实现)
local SLIDING_WINDOW_SCRIPT = [[
    local key = KEYS[1]
    local now = tonumber(ARGV[1])
    local window = tonumber(ARGV[2])
    local max_requests = tonumber(ARGV[3])
    local window_start = now - window

    -- 移除过期记录
    redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)

    -- 获取当前计数
    local count = redis.call('ZCARD', key)

    if count < max_requests then
        -- 添加新记录
        redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
        redis.call('EXPIRE', key, window)
        return {1, max_requests - count - 1, window_start}
    else
        -- 获取最早的记录,计算重置时间
        local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
        local retry_after = 0
        if #oldest >= 2 then
            retry_after = math.ceil((tonumber(oldest[2]) + window - now) / 1000)
        end
        return {0, 0, retry_after}
    end
]]

-- 令牌桶限流(Redis 实现)
local TOKEN_BUCKET_SCRIPT = [[
    local key = KEYS[1]
    local now = tonumber(ARGV[1])
    local rate = tonumber(ARGV[2])
    local capacity = tonumber(ARGV[3])
    local requested = tonumber(ARGV[4])

    local data = redis.call('HMGET', key, 'tokens', 'last_time')
    local tokens = tonumber(data[1]) or capacity
    local last_time = tonumber(data[2]) or now

    -- 补充令牌
    local elapsed = now - last_time
    tokens = math.min(capacity, tokens + (elapsed * rate))

    if tokens >= requested then
        tokens = tokens - requested
        redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
        redis.call('EXPIRE', key, math.ceil(capacity / rate) + 1)
        return {1, tokens}
    else
        local wait_time = (requested - tokens) / rate
        return {0, wait_time}
    end
]]

function _M.new(config)
    return {
        max_requests = config.max_requests or 100,
        window = config.window or 60,
        algorithm = config.algorithm or "sliding_window",
        key_prefix = config.key_prefix or "rl:",
    }
end

function _M:incoming(key, commit)
    local red, err = get_redis()
    if not red then
        ngx.log(ngx.ERR, "Redis connect failed: ", err)
        -- 降级:放行请求
        return true, "degraded"
    end

    local redis_key = self.key_prefix .. key
    local now = ngx.now() * 1000  -- 毫秒

    local res, err
    if self.algorithm == "sliding_window" then
        res, err = red:eval(SLIDING_WINDOW_SCRIPT, 1, redis_key,
            now, self.window * 1000, self.max_requests)
    elseif self.algorithm == "token_bucket" then
        res, err = red:eval(TOKEN_BUCKET_SCRIPT, 1, redis_key,
            now, self.max_requests / self.window, self.max_requests, 1)
    end

    red:set_keepalive(10000, 100)

    if not res then
        ngx.log(ngx.ERR, "Redis eval failed: ", err)
        return true, "degraded"
    end

    if res[1] == 1 then
        return true, "ok", res[2]
    else
        return false, "rejected", res[2]
    end
end

return _M

6.8 多维度限流

实际场景中,通常需要对不同维度组合限流:

-- /usr/local/openresty/lua/rate_limiter.lua
local _M = {}

-- 限流规则配置
local rules = {
    -- IP 级别限流
    {
        name = "ip_limit",
        key_func = function() return ngx.var.remote_addr end,
        max_requests = 1000,
        window = 60,
        response = {
            status = 429,
            body = '{"error":"Too many requests from this IP"}',
        },
    },
    -- API 级别限流
    {
        name = "api_limit",
        key_func = function() return ngx.var.uri end,
        max_requests = 5000,
        window = 60,
        response = {
            status = 429,
            body = '{"error":"API rate limit exceeded"}',
        },
    },
    -- 用户级别限流
    {
        name = "user_limit",
        key_func = function()
            return ngx.var.http_x_user_id or ngx.var.remote_addr
        end,
        max_requests = 100,
        window = 60,
        response = {
            status = 429,
            body = '{"error":"User rate limit exceeded"}',
        },
    },
    -- 端点级别限流(不同 API 不同限额)
    {
        name = "endpoint_limit",
        key_func = function()
            local method = ngx.req.get_method()
            local uri = ngx.var.uri
            -- POST/PUT/DELETE 限额更严格
            if method == "POST" or method == "PUT" or method == "DELETE" then
                return method .. ":" .. uri
            end
            return nil  -- GET 不限流
        end,
        max_requests = 50,
        window = 60,
        response = {
            status = 429,
            body = '{"error":"Write rate limit exceeded"}',
        },
    },
}

-- 限流器实例(懒加载)
local limiters = {}

local function get_limiter(rule)
    if limiters[rule.name] then
        return limiters[rule.name]
    end

    local limiter_mod = require "limiters.token_bucket"
    local limiter, err = limiter_mod.new("rate_limit", rule.max_requests, rule.window)
    if limiter then
        limiters[rule.name] = limiter
    end
    return limiter
end

function _M.check()
    for _, rule in ipairs(rules) do
        local key = rule.key_func()
        if key then
            local limiter = get_limiter(rule)
            if limiter then
                local ok, err, extra = limiter:incoming(key, true)
                if not ok then
                    -- 设置限流响应头
                    ngx.header["X-RateLimit-Limit"] = rule.max_requests
                    ngx.header["X-RateLimit-Remaining"] = 0
                    ngx.header["Retry-After"] = extra or 1

                    ngx.status = rule.response.status
                    ngx.say(rule.response.body)
                    return ngx.exit(rule.response.status)
                end
            end
        end
    end
end

return _M

nginx 配置

lua_shared_dict rate_limit 50m;

server {
    listen 8080;

    location /api/ {
        access_by_lua_block {
            local rate_limiter = require "rate_limiter"
            rate_limiter.check()
        }

        proxy_pass http://backend;
    }
}

6.9 自适应限流

根据后端服务的响应时间和错误率动态调整限流阈值。

-- /usr/local/openresty/lua/adaptive_limiter.lua
local _M = {}

-- 性能指标收集
local function get_metrics()
    local shared = ngx.shared.rate_limit

    local total = shared:get("req_total") or 0
    local errors = shared:get("req_errors") or 0
    local total_latency = shared:get("latency_total") or 0

    local error_rate = total > 0 and (errors / total) or 0
    local avg_latency = total > 0 and (total_latency / total) or 0

    return {
        total = total,
        error_rate = error_rate,
        avg_latency = avg_latency,
    }
end

-- 自适应限流决策
function _M.get_limit(base_limit)
    local metrics = get_metrics()
    local limit = base_limit

    -- 根据错误率调整
    if metrics.error_rate > 0.1 then  -- 错误率 > 10%
        limit = limit * 0.5           -- 降低 50%
    elseif metrics.error_rate > 0.05 then  -- 错误率 > 5%
        limit = limit * 0.75          -- 降低 25%
    end

    -- 根据延迟调整
    if metrics.avg_latency > 2000 then  -- 平均延迟 > 2s
        limit = limit * 0.5
    elseif metrics.avg_latency > 1000 then  -- 平均延迟 > 1s
        limit = limit * 0.75
    end

    -- 设置下限
    limit = math.max(limit, base_limit * 0.1)

    return math.floor(limit)
end

-- 记录请求指标
function _M.record(status, latency)
    local shared = ngx.shared.rate_limit

    shared:incr("req_total", 1, 0, 60)

    if status >= 500 then
        shared:incr("req_errors", 1, 0, 60)
    end

    shared:incr("latency_total", latency, 0, 60)
end

return _M

6.10 限流响应最佳实践

-- 限流被拒绝时的标准响应
local function rate_limit_response(rule, retry_after)
    ngx.header["Content-Type"] = "application/json"
    ngx.header["X-RateLimit-Limit"] = rule.max_requests
    ngx.header["X-RateLimit-Remaining"] = 0
    ngx.header["X-RateLimit-Reset"] = ngx.time() + (retry_after or 60)
    ngx.header["Retry-After"] = retry_after or 60

    ngx.status = 429
    ngx.say(cjson.encode({
        error = "Too Many Requests",
        message = "Rate limit exceeded. Please retry after " .. (retry_after or 60) .. " seconds.",
        retry_after = retry_after or 60,
    }))
end

标准响应头

响应头说明示例
X-RateLimit-Limit窗口内最大请求数100
X-RateLimit-Remaining剩余请求数42
X-RateLimit-Reset窗口重置时间戳1704067260
Retry-After建议等待秒数30

6.11 注意事项

Redis 故障降级:当 Redis 不可用时,限流组件应降级为放行或本地限流,避免因限流组件故障导致全部请求被拒绝。

精度与性能:Redis 限流会增加 1-5ms 延迟。对于高 QPS 场景,可以先用本地限流(ngx.shared.DICT)做第一层过滤,再用 Redis 做分布式限流。

时钟同步:分布式限流依赖时间一致性,确保所有节点的时钟偏差在 1 秒以内(使用 NTP)。


上一章← 第 05 章 - 路由与动态路由 下一章第 07 章 - 认证与鉴权 →