第 11 章 - 日志与监控
第 11 章 - 日志与监控
11.1 日志架构
OpenResty 网关
│
├── 访问日志(access_log)
│ ├── 文件日志 → Filebeat → Logstash → Elasticsearch
│ └── 直接写入 → Kafka → 消费者
│
├── 错误日志(error_log)
│ └── 文件日志 → 监控告警
│
├── 自定义业务日志
│ └── Lua → Redis/Kafka/文件
│
└── 指标(metrics)
└── Lua → Prometheus Exporter → Prometheus → Grafana
11.2 Nginx 访问日志
11.2.1 日志格式定义
http {
# 基础日志格式
log_format basic '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent';
# 详细日志格式
log_format detailed '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" '
'rt=$request_time '
'urt=$upstream_response_time '
'uct=$upstream_connect_time '
'uht=$upstream_header_time';
# JSON 结构化日志(推荐)
log_format json_log escape=json
'{'
'"timestamp":"$time_iso8601",'
'"remote_addr":"$remote_addr",'
'"remote_user":"$remote_user",'
'"request_method":"$request_method",'
'"request_uri":"$request_uri",'
'"uri":"$uri",'
'"args":"$args",'
'"status":$status,'
'"body_bytes_sent":$body_bytes_sent,'
'"request_time":$request_time,'
'"upstream_response_time":"$upstream_response_time",'
'"upstream_addr":"$upstream_addr",'
'"upstream_status":"$upstream_status",'
'"http_referer":"$http_referer",'
'"http_user_agent":"$http_user_agent",'
'"http_x_forwarded_for":"$http_x_forwarded_for",'
'"http_authorization":"$http_authorization",'
'"request_id":"$request_id",'
'"ssl_protocol":"$ssl_protocol",'
'"gzip_ratio":"$gzip_ratio"'
'}';
# 自定义字段日志(通过 Lua 设置变量)
log_format custom_log escape=json
'{'
'"timestamp":"$time_iso8601",'
'"client_ip":"$remote_addr",'
'"method":"$request_method",'
'"uri":"$uri",'
'"status":$status,'
'"latency":$request_time,'
'"user_id":"$user_id",'
'"user_role":"$user_role",'
'"api_version":"$api_version",'
'"request_id":"$request_id",'
'"upstream":"$upstream_addr",'
'"cache_status":"$upstream_cache_status"'
'}';
server {
listen 8080;
# 使用不同日志格式
access_log /var/log/openresty/access.log json_log;
error_log /var/log/openresty/error.log warn;
# 条件日志(过滤健康检查)
map $request_uri $loggable {
~^/health 0;
~^/ping 0;
default 1;
}
access_log /var/log/openresty/access.log json_log if=$loggable;
}
}
11.2.2 日志轮转
# /etc/logrotate.d/openresty
/var/log/openresty/*.log {
daily
rotate 30
missingok
notifempty
compress
delaycompress
sharedscripts
postrotate
[ -f /usr/local/openresty/nginx/logs/nginx.pid ] && \
kill -USR1 $(cat /usr/local/openresty/nginx/logs/nginx.pid)
endscript
}
11.3 Lua 结构化日志
-- /usr/local/openresty/lua/logger.lua
local _M = {}
local cjson = require "cjson"
-- 日志级别
local LOG_LEVELS = {
DEBUG = 1,
INFO = 2,
WARN = 3,
ERROR = 4,
FATAL = 5,
}
local current_level = LOG_LEVELS.INFO
-- 日志缓冲区(批量写入减少 I/O)
local log_buffer = {}
local buffer_size = 100
local buffer_flush_interval = 5 -- 秒
-- 创建日志条目
local function create_log_entry(level, message, extra)
local entry = {
timestamp = ngx.now(),
level = level,
message = message,
pid = ngx.worker.pid(),
request_id = ngx.var.request_id or "",
client_ip = ngx.var.remote_addr or "",
uri = ngx.var.uri or "",
method = ngx.req.get_method() or "",
}
-- 合并额外字段
if extra then
for k, v in pairs(extra) do
entry[k] = v
end
end
return entry
end
-- 格式化日志
local function format_log(entry)
return cjson.encode(entry)
end
-- 写入文件
local function write_to_file(filepath, message)
local fd = io.open(filepath, "a")
if fd then
fd:write(message .. "\n")
fd:close()
end
end
-- 基本日志函数
function _M.log(level, message, extra)
if LOG_LEVELS[level] < current_level then
return
end
local entry = create_log_entry(level, message, extra)
local formatted = format_log(entry)
-- Nginx error log
ngx.log(ngx[string.lower(level)], message)
-- 异步写入文件(使用 timer)
ngx.timer.at(0, function(premature)
if premature then return end
write_to_file("/var/log/openresty/gateway.log", formatted)
end)
end
-- 便捷方法
function _M.debug(msg, extra) _M.log("DEBUG", msg, extra) end
function _M.info(msg, extra) _M.log("INFO", msg, extra) end
function _M.warn(msg, extra) _M.log("WARN", msg, extra) end
function _M.error(msg, extra) _M.log("ERROR", msg, extra) end
-- 请求日志中间件
function _M.request_logger()
local start_time = ngx.now()
-- log_by_lua 阶段执行
local log_entry = {
timestamp = ngx.now(),
request_id = ngx.var.request_id or "",
client_ip = ngx.var.remote_addr,
method = ngx.req.get_method(),
uri = ngx.var.uri,
args = ngx.var.args or "",
status = ngx.status,
request_time = (ngx.now() - start_time) * 1000,
upstream_time = tonumber(ngx.var.upstream_response_time) or 0,
body_bytes = tonumber(ngx.var.body_bytes_sent) or 0,
user_agent = ngx.var.http_user_agent or "",
referer = ngx.var.http_referer or "",
user_id = ngx.var.user_id or "",
cache_status = ngx.var.upstream_cache_status or "",
}
ngx.timer.at(0, function(premature)
if premature then return end
write_to_file("/var/log/openresty/requests.log", cjson.encode(log_entry))
end)
end
return _M
nginx 配置
server {
listen 8080;
set $user_id "";
set $user_role "";
location /api/ {
# 使用自定义日志
log_by_lua_block {
local logger = require "logger"
logger.request_logger()
}
proxy_pass http://backend;
}
}
11.4 ELK 集成
11.4.1 Filebeat 配置
# /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/openresty/access.log
json.keys_under_root: true
json.add_error_key: true
fields:
service: openresty-gateway
environment: production
fields_under_root: true
output.elasticsearch:
hosts: ["http://elasticsearch:9200"]
index: "gateway-logs-%{+yyyy.MM.dd}"
setup.template.name: "gateway-logs"
setup.template.pattern: "gateway-logs-*"
11.4.2 直接写入 Elasticsearch
-- /usr/local/openresty/lua/elasticsearch_logger.lua
local _M = {}
local http = require "resty.http"
local cjson = require "cjson"
local config = {
hosts = {"http://es1:9200", "http://es2:9200"},
index_prefix = "gateway-logs",
batch_size = 100,
flush_interval = 5,
}
-- 日志缓冲
local buffer = {}
local last_flush = ngx.time()
-- 批量写入 Elasticsearch
local function flush_to_es(logs)
if #logs == 0 then return end
local index = config.index_prefix .. "-" .. os.date("%Y.%m.%d")
-- 构建 Bulk API 请求体
local body_parts = {}
for _, log in ipairs(logs) do
table.insert(body_parts, cjson.encode({index = {_index = index}}))
table.insert(body_parts, cjson.encode(log))
end
local body = table.concat(body_parts, "\n") .. "\n"
-- 选择 ES 节点
local host = config.hosts[math.random(#config.hosts)]
local httpc = http.new()
httpc:set_timeout(10000)
local res, err = httpc:request_uri(host .. "/_bulk", {
method = "POST",
body = body,
headers = {["Content-Type"] = "application/x-ndjson"},
})
if not res then
ngx.log(ngx.ERR, "ES bulk request failed: ", err)
return false
end
return true
end
-- 添加日志到缓冲
function _M.add(log_entry)
table.insert(buffer, log_entry)
-- 达到批量大小或超时,刷新缓冲
if #buffer >= config.batch_size or
(ngx.time() - last_flush) >= config.flush_interval then
_M.flush()
end
end
-- 手动刷新
function _M.flush()
if #buffer == 0 then return end
local logs_to_send = buffer
buffer = {}
last_flush = ngx.time()
-- 异步发送
ngx.timer.at(0, function(premature)
if premature then return end
flush_to_es(logs_to_send)
end)
end
-- 启动定期刷新定时器
function _M.start()
ngx.timer.every(config.flush_interval, function(premature)
if premature then return end
_M.flush()
end)
end
return _M
11.5 Kafka 日志集成
-- /usr/local/openresty/lua/kafka_logger.lua
local _M = {}
local cjson = require "cjson"
function _M.produce(topic, messages)
-- 使用 lua-resty-kafka
local broker_list = {
{host = "kafka1", port = 9092},
{host = "kafka2", port = 9092},
{host = "kafka3", port = 9092},
}
local producer = require "resty.kafka.producer"
local bp, err = producer:new(broker_list, {
producer_type = "async",
batch_size = 100,
flush_time = 5000,
})
if not bp then
ngx.log(ngx.ERR, "Failed to create Kafka producer: ", err)
return false
end
for _, msg in ipairs(messages) do
local ok, err = bp:send(topic, nil, cjson.encode(msg))
if not ok then
ngx.log(ngx.ERR, "Kafka send failed: ", err)
end
end
return true
end
-- 请求日志发送到 Kafka
function _M.send_request_log()
local log_entry = {
timestamp = ngx.now(),
request_id = ngx.var.request_id,
client_ip = ngx.var.remote_addr,
method = ngx.req.get_method(),
uri = ngx.var.uri,
status = ngx.status,
latency = ngx.var.request_time,
user_id = ngx.var.user_id,
}
ngx.timer.at(0, function(premature)
if premature then return end
_M.produce("gateway-access-logs", {log_entry})
end)
end
return _M
11.6 Prometheus 监控
11.6.1 Prometheus 指标收集
-- /usr/local/openresty/lua/metrics.lua
local _M = {}
-- 指标存储(使用共享内存)
local metrics = ngx.shared.metrics
-- 指标类型
-- Counter: 只增不减(请求数、错误数)
-- Gauge: 可增可减(当前连接数、队列长度)
-- Histogram: 分布统计(延迟分布)
-- 递增计数器
function _M.counter_inc(name, labels)
local key = name
if labels then
local parts = {}
for k, v in pairs(labels) do
table.insert(parts, k .. "=" .. v)
end
key = name .. "{" .. table.concat(parts, ",") .. "}"
end
metrics:incr(key, 1, 0)
end
-- 设置 Gauge
function _M.gauge_set(name, value, labels)
local key = name
if labels then
local parts = {}
for k, v in pairs(labels) do
table.insert(parts, k .. "=" .. v)
end
key = name .. "{" .. table.concat(parts, ",") .. "}"
end
metrics:set(key, value)
end
-- 记录 Histogram(简化版,使用桶计数)
function _M.histogram_observe(name, value, labels)
-- 定义桶边界
local buckets = {0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
for _, bucket in ipairs(buckets) do
if value <= bucket then
_M.counter_inc(name .. "_bucket", {le = tostring(bucket)})
end
end
_M.counter_inc(name .. "_bucket", {le = "+Inf"})
_M.counter_inc(name .. "_count", labels)
end
-- Prometheus 格式输出
function _M.export()
local keys = metrics:get_keys(0)
local output = {}
-- 按指标名分组
local grouped = {}
for _, key in ipairs(keys) do
local name = key:match("^([^{]+)")
if not grouped[name] then
grouped[name] = {}
end
table.insert(grouped[name], {key = key, value = metrics:get(key)})
end
-- 生成 Prometheus 格式
for name, entries in pairs(grouped) do
-- 添加 HELP 和 TYPE
table.insert(output, "# HELP " .. name .. " Gateway metric")
table.insert(output, "# TYPE " .. name .. " counter")
for _, entry in ipairs(entries) do
table.insert(output, entry.key .. " " .. entry.value)
end
end
return table.concat(output, "\n") .. "\n"
end
-- 请求指标中间件
function _M.request_metrics()
local start_time = ngx.ctx.start_time or ngx.now()
local latency = ngx.now() - start_time
-- 请求计数
_M.counter_inc("gateway_requests_total", {
method = ngx.req.get_method(),
status = tostring(ngx.status),
path = ngx.var.uri,
})
-- 延迟分布
_M.histogram_observe("gateway_request_duration_seconds", latency, {
method = ngx.req.get_method(),
path = ngx.var.uri,
})
-- 错误计数
if ngx.status >= 500 then
_M.counter_inc("gateway_errors_total", {
status = tostring(ngx.status),
})
end
end
return _M
11.6.2 Prometheus 端点
server {
listen 8080;
# 指标端点
location /metrics {
content_by_lua_block {
local metrics = require "metrics"
ngx.header["Content-Type"] = "text/plain; charset=utf-8"
ngx.say(metrics.export())
}
}
# 指标收集中间件
location /api/ {
rewrite_by_lua_block {
ngx.ctx.start_time = ngx.now()
}
log_by_lua_block {
local metrics = require "metrics"
metrics.request_metrics()
}
proxy_pass http://backend;
}
}
11.7 告警规则
# Prometheus 告警规则
groups:
- name: gateway_alerts
rules:
# 高错误率
- alert: HighErrorRate
expr: |
rate(gateway_errors_total[5m])
/ rate(gateway_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Gateway error rate > 5%"
description: "Error rate is {{ $value | humanizePercentage }}"
# 高延迟
- alert: HighLatency
expr: |
histogram_quantile(0.95, rate(gateway_request_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "P95 latency > 1s"
# 低缓存命中率
- alert: LowCacheHitRate
expr: |
rate(gateway_cache_hits[5m])
/ rate(gateway_requests_total[5m]) < 0.3
for: 10m
labels:
severity: warning
annotations:
summary: "Cache hit rate < 30%"
11.8 注意事项
日志量控制:高 QPS 场景下,全量日志会占用大量磁盘。使用采样(如 10% 请求记录详细日志)或按状态码过滤。
异步写入:日志写入必须使用
ngx.timer.at异步执行,避免阻塞请求处理。
敏感信息:日志中不要记录完整的 Authorization 头、密码、Token 等敏感信息。使用脱敏或完全不记录。
性能影响:
body_filter阶段的日志会影响sendfile优化,大响应场景避免在 body_filter 中做日志。
上一章:← 第 10 章 - 数据转换与协议处理 下一章:第 12 章 - 安全防护 →