强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 11 章 - 日志与监控

第 11 章 - 日志与监控

11.1 日志架构

OpenResty 网关
    │
    ├── 访问日志(access_log)
    │   ├── 文件日志 → Filebeat → Logstash → Elasticsearch
    │   └── 直接写入 → Kafka → 消费者
    │
    ├── 错误日志(error_log)
    │   └── 文件日志 → 监控告警
    │
    ├── 自定义业务日志
    │   └── Lua → Redis/Kafka/文件
    │
    └── 指标(metrics)
        └── Lua → Prometheus Exporter → Prometheus → Grafana

11.2 Nginx 访问日志

11.2.1 日志格式定义

http {
    # 基础日志格式
    log_format basic '$remote_addr - $remote_user [$time_local] '
                     '"$request" $status $body_bytes_sent';

    # 详细日志格式
    log_format detailed '$remote_addr - $remote_user [$time_local] '
                        '"$request" $status $body_bytes_sent '
                        '"$http_referer" "$http_user_agent" '
                        'rt=$request_time '
                        'urt=$upstream_response_time '
                        'uct=$upstream_connect_time '
                        'uht=$upstream_header_time';

    # JSON 结构化日志(推荐)
    log_format json_log escape=json
        '{'
            '"timestamp":"$time_iso8601",'
            '"remote_addr":"$remote_addr",'
            '"remote_user":"$remote_user",'
            '"request_method":"$request_method",'
            '"request_uri":"$request_uri",'
            '"uri":"$uri",'
            '"args":"$args",'
            '"status":$status,'
            '"body_bytes_sent":$body_bytes_sent,'
            '"request_time":$request_time,'
            '"upstream_response_time":"$upstream_response_time",'
            '"upstream_addr":"$upstream_addr",'
            '"upstream_status":"$upstream_status",'
            '"http_referer":"$http_referer",'
            '"http_user_agent":"$http_user_agent",'
            '"http_x_forwarded_for":"$http_x_forwarded_for",'
            '"http_authorization":"$http_authorization",'
            '"request_id":"$request_id",'
            '"ssl_protocol":"$ssl_protocol",'
            '"gzip_ratio":"$gzip_ratio"'
        '}';

    # 自定义字段日志(通过 Lua 设置变量)
    log_format custom_log escape=json
        '{'
            '"timestamp":"$time_iso8601",'
            '"client_ip":"$remote_addr",'
            '"method":"$request_method",'
            '"uri":"$uri",'
            '"status":$status,'
            '"latency":$request_time,'
            '"user_id":"$user_id",'
            '"user_role":"$user_role",'
            '"api_version":"$api_version",'
            '"request_id":"$request_id",'
            '"upstream":"$upstream_addr",'
            '"cache_status":"$upstream_cache_status"'
        '}';

    server {
        listen 8080;

        # 使用不同日志格式
        access_log /var/log/openresty/access.log json_log;
        error_log  /var/log/openresty/error.log warn;

        # 条件日志(过滤健康检查)
        map $request_uri $loggable {
            ~^/health 0;
            ~^/ping   0;
            default   1;
        }

        access_log /var/log/openresty/access.log json_log if=$loggable;
    }
}

11.2.2 日志轮转

# /etc/logrotate.d/openresty
/var/log/openresty/*.log {
    daily
    rotate 30
    missingok
    notifempty
    compress
    delaycompress
    sharedscripts
    postrotate
        [ -f /usr/local/openresty/nginx/logs/nginx.pid ] && \
        kill -USR1 $(cat /usr/local/openresty/nginx/logs/nginx.pid)
    endscript
}

11.3 Lua 结构化日志

-- /usr/local/openresty/lua/logger.lua
local _M = {}

local cjson = require "cjson"

-- 日志级别
local LOG_LEVELS = {
    DEBUG = 1,
    INFO  = 2,
    WARN  = 3,
    ERROR = 4,
    FATAL = 5,
}

local current_level = LOG_LEVELS.INFO

-- 日志缓冲区(批量写入减少 I/O)
local log_buffer = {}
local buffer_size = 100
local buffer_flush_interval = 5  -- 秒

-- 创建日志条目
local function create_log_entry(level, message, extra)
    local entry = {
        timestamp = ngx.now(),
        level = level,
        message = message,
        pid = ngx.worker.pid(),
        request_id = ngx.var.request_id or "",
        client_ip = ngx.var.remote_addr or "",
        uri = ngx.var.uri or "",
        method = ngx.req.get_method() or "",
    }

    -- 合并额外字段
    if extra then
        for k, v in pairs(extra) do
            entry[k] = v
        end
    end

    return entry
end

-- 格式化日志
local function format_log(entry)
    return cjson.encode(entry)
end

-- 写入文件
local function write_to_file(filepath, message)
    local fd = io.open(filepath, "a")
    if fd then
        fd:write(message .. "\n")
        fd:close()
    end
end

-- 基本日志函数
function _M.log(level, message, extra)
    if LOG_LEVELS[level] < current_level then
        return
    end

    local entry = create_log_entry(level, message, extra)
    local formatted = format_log(entry)

    -- Nginx error log
    ngx.log(ngx[string.lower(level)], message)

    -- 异步写入文件(使用 timer)
    ngx.timer.at(0, function(premature)
        if premature then return end
        write_to_file("/var/log/openresty/gateway.log", formatted)
    end)
end

-- 便捷方法
function _M.debug(msg, extra) _M.log("DEBUG", msg, extra) end
function _M.info(msg, extra)  _M.log("INFO", msg, extra) end
function _M.warn(msg, extra)  _M.log("WARN", msg, extra) end
function _M.error(msg, extra) _M.log("ERROR", msg, extra) end

-- 请求日志中间件
function _M.request_logger()
    local start_time = ngx.now()

    -- log_by_lua 阶段执行
    local log_entry = {
        timestamp = ngx.now(),
        request_id = ngx.var.request_id or "",
        client_ip = ngx.var.remote_addr,
        method = ngx.req.get_method(),
        uri = ngx.var.uri,
        args = ngx.var.args or "",
        status = ngx.status,
        request_time = (ngx.now() - start_time) * 1000,
        upstream_time = tonumber(ngx.var.upstream_response_time) or 0,
        body_bytes = tonumber(ngx.var.body_bytes_sent) or 0,
        user_agent = ngx.var.http_user_agent or "",
        referer = ngx.var.http_referer or "",
        user_id = ngx.var.user_id or "",
        cache_status = ngx.var.upstream_cache_status or "",
    }

    ngx.timer.at(0, function(premature)
        if premature then return end
        write_to_file("/var/log/openresty/requests.log", cjson.encode(log_entry))
    end)
end

return _M

nginx 配置

server {
    listen 8080;

    set $user_id "";
    set $user_role "";

    location /api/ {
        # 使用自定义日志
        log_by_lua_block {
            local logger = require "logger"
            logger.request_logger()
        }

        proxy_pass http://backend;
    }
}

11.4 ELK 集成

11.4.1 Filebeat 配置

# /etc/filebeat/filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/openresty/access.log
    json.keys_under_root: true
    json.add_error_key: true
    fields:
      service: openresty-gateway
      environment: production
    fields_under_root: true

output.elasticsearch:
  hosts: ["http://elasticsearch:9200"]
  index: "gateway-logs-%{+yyyy.MM.dd}"

setup.template.name: "gateway-logs"
setup.template.pattern: "gateway-logs-*"

11.4.2 直接写入 Elasticsearch

-- /usr/local/openresty/lua/elasticsearch_logger.lua
local _M = {}

local http = require "resty.http"
local cjson = require "cjson"

local config = {
    hosts = {"http://es1:9200", "http://es2:9200"},
    index_prefix = "gateway-logs",
    batch_size = 100,
    flush_interval = 5,
}

-- 日志缓冲
local buffer = {}
local last_flush = ngx.time()

-- 批量写入 Elasticsearch
local function flush_to_es(logs)
    if #logs == 0 then return end

    local index = config.index_prefix .. "-" .. os.date("%Y.%m.%d")

    -- 构建 Bulk API 请求体
    local body_parts = {}
    for _, log in ipairs(logs) do
        table.insert(body_parts, cjson.encode({index = {_index = index}}))
        table.insert(body_parts, cjson.encode(log))
    end
    local body = table.concat(body_parts, "\n") .. "\n"

    -- 选择 ES 节点
    local host = config.hosts[math.random(#config.hosts)]

    local httpc = http.new()
    httpc:set_timeout(10000)

    local res, err = httpc:request_uri(host .. "/_bulk", {
        method = "POST",
        body = body,
        headers = {["Content-Type"] = "application/x-ndjson"},
    })

    if not res then
        ngx.log(ngx.ERR, "ES bulk request failed: ", err)
        return false
    end

    return true
end

-- 添加日志到缓冲
function _M.add(log_entry)
    table.insert(buffer, log_entry)

    -- 达到批量大小或超时,刷新缓冲
    if #buffer >= config.batch_size or
       (ngx.time() - last_flush) >= config.flush_interval then
        _M.flush()
    end
end

-- 手动刷新
function _M.flush()
    if #buffer == 0 then return end

    local logs_to_send = buffer
    buffer = {}
    last_flush = ngx.time()

    -- 异步发送
    ngx.timer.at(0, function(premature)
        if premature then return end
        flush_to_es(logs_to_send)
    end)
end

-- 启动定期刷新定时器
function _M.start()
    ngx.timer.every(config.flush_interval, function(premature)
        if premature then return end
        _M.flush()
    end)
end

return _M

11.5 Kafka 日志集成

-- /usr/local/openresty/lua/kafka_logger.lua
local _M = {}

local cjson = require "cjson"

function _M.produce(topic, messages)
    -- 使用 lua-resty-kafka
    local broker_list = {
        {host = "kafka1", port = 9092},
        {host = "kafka2", port = 9092},
        {host = "kafka3", port = 9092},
    }

    local producer = require "resty.kafka.producer"
    local bp, err = producer:new(broker_list, {
        producer_type = "async",
        batch_size = 100,
        flush_time = 5000,
    })

    if not bp then
        ngx.log(ngx.ERR, "Failed to create Kafka producer: ", err)
        return false
    end

    for _, msg in ipairs(messages) do
        local ok, err = bp:send(topic, nil, cjson.encode(msg))
        if not ok then
            ngx.log(ngx.ERR, "Kafka send failed: ", err)
        end
    end

    return true
end

-- 请求日志发送到 Kafka
function _M.send_request_log()
    local log_entry = {
        timestamp = ngx.now(),
        request_id = ngx.var.request_id,
        client_ip = ngx.var.remote_addr,
        method = ngx.req.get_method(),
        uri = ngx.var.uri,
        status = ngx.status,
        latency = ngx.var.request_time,
        user_id = ngx.var.user_id,
    }

    ngx.timer.at(0, function(premature)
        if premature then return end
        _M.produce("gateway-access-logs", {log_entry})
    end)
end

return _M

11.6 Prometheus 监控

11.6.1 Prometheus 指标收集

-- /usr/local/openresty/lua/metrics.lua
local _M = {}

-- 指标存储(使用共享内存)
local metrics = ngx.shared.metrics

-- 指标类型
-- Counter: 只增不减(请求数、错误数)
-- Gauge: 可增可减(当前连接数、队列长度)
-- Histogram: 分布统计(延迟分布)

-- 递增计数器
function _M.counter_inc(name, labels)
    local key = name
    if labels then
        local parts = {}
        for k, v in pairs(labels) do
            table.insert(parts, k .. "=" .. v)
        end
        key = name .. "{" .. table.concat(parts, ",") .. "}"
    end
    metrics:incr(key, 1, 0)
end

-- 设置 Gauge
function _M.gauge_set(name, value, labels)
    local key = name
    if labels then
        local parts = {}
        for k, v in pairs(labels) do
            table.insert(parts, k .. "=" .. v)
        end
        key = name .. "{" .. table.concat(parts, ",") .. "}"
    end
    metrics:set(key, value)
end

-- 记录 Histogram(简化版,使用桶计数)
function _M.histogram_observe(name, value, labels)
    -- 定义桶边界
    local buckets = {0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}

    for _, bucket in ipairs(buckets) do
        if value <= bucket then
            _M.counter_inc(name .. "_bucket", {le = tostring(bucket)})
        end
    end
    _M.counter_inc(name .. "_bucket", {le = "+Inf"})
    _M.counter_inc(name .. "_count", labels)
end

-- Prometheus 格式输出
function _M.export()
    local keys = metrics:get_keys(0)
    local output = {}

    -- 按指标名分组
    local grouped = {}
    for _, key in ipairs(keys) do
        local name = key:match("^([^{]+)")
        if not grouped[name] then
            grouped[name] = {}
        end
        table.insert(grouped[name], {key = key, value = metrics:get(key)})
    end

    -- 生成 Prometheus 格式
    for name, entries in pairs(grouped) do
        -- 添加 HELP 和 TYPE
        table.insert(output, "# HELP " .. name .. " Gateway metric")
        table.insert(output, "# TYPE " .. name .. " counter")

        for _, entry in ipairs(entries) do
            table.insert(output, entry.key .. " " .. entry.value)
        end
    end

    return table.concat(output, "\n") .. "\n"
end

-- 请求指标中间件
function _M.request_metrics()
    local start_time = ngx.ctx.start_time or ngx.now()
    local latency = ngx.now() - start_time

    -- 请求计数
    _M.counter_inc("gateway_requests_total", {
        method = ngx.req.get_method(),
        status = tostring(ngx.status),
        path = ngx.var.uri,
    })

    -- 延迟分布
    _M.histogram_observe("gateway_request_duration_seconds", latency, {
        method = ngx.req.get_method(),
        path = ngx.var.uri,
    })

    -- 错误计数
    if ngx.status >= 500 then
        _M.counter_inc("gateway_errors_total", {
            status = tostring(ngx.status),
        })
    end
end

return _M

11.6.2 Prometheus 端点

server {
    listen 8080;

    # 指标端点
    location /metrics {
        content_by_lua_block {
            local metrics = require "metrics"
            ngx.header["Content-Type"] = "text/plain; charset=utf-8"
            ngx.say(metrics.export())
        }
    }

    # 指标收集中间件
    location /api/ {
        rewrite_by_lua_block {
            ngx.ctx.start_time = ngx.now()
        }

        log_by_lua_block {
            local metrics = require "metrics"
            metrics.request_metrics()
        }

        proxy_pass http://backend;
    }
}

11.7 告警规则

# Prometheus 告警规则
groups:
  - name: gateway_alerts
    rules:
      # 高错误率
      - alert: HighErrorRate
        expr: |
          rate(gateway_errors_total[5m])
          / rate(gateway_requests_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Gateway error rate > 5%"
          description: "Error rate is {{ $value | humanizePercentage }}"

      # 高延迟
      - alert: HighLatency
        expr: |
          histogram_quantile(0.95, rate(gateway_request_duration_seconds_bucket[5m])) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P95 latency > 1s"

      # 低缓存命中率
      - alert: LowCacheHitRate
        expr: |
          rate(gateway_cache_hits[5m])
          / rate(gateway_requests_total[5m]) < 0.3
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Cache hit rate < 30%"

11.8 注意事项

日志量控制:高 QPS 场景下,全量日志会占用大量磁盘。使用采样(如 10% 请求记录详细日志)或按状态码过滤。

异步写入:日志写入必须使用 ngx.timer.at 异步执行,避免阻塞请求处理。

敏感信息:日志中不要记录完整的 Authorization 头、密码、Token 等敏感信息。使用脱敏或完全不记录。

性能影响body_filter 阶段的日志会影响 sendfile 优化,大响应场景避免在 body_filter 中做日志。


上一章← 第 10 章 - 数据转换与协议处理 下一章第 12 章 - 安全防护 →