强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OpenResty 高性能网关开发教程 / 第 10 章 - 数据转换与协议处理

第 10 章 - 数据转换与协议处理

10.1 概述

API 网关经常需要在客户端和后端服务之间进行数据格式和协议的转换。

转换类型说明场景
请求改写修改请求头、URI、Body统一 API 格式
响应改写修改响应头、Body、状态码统一响应格式
协议转换HTTP ↔ gRPC ↔ WebSocket新旧系统对接
数据格式JSON ↔ XML ↔ Form第三方 API 对接

10.2 请求改写

10.2.1 URI 改写

-- /usr/local/openresty/lua/transform/uri_rewrite.lua
local _M = {}

-- 路径前缀替换
function _M.strip_prefix(prefix)
    local uri = ngx.var.uri
    if uri:sub(1, #prefix) == prefix then
        ngx.req.set_uri(uri:sub(#prefix + 1), true)
    end
end

-- 路径添加前缀
function _M.add_prefix(prefix)
    local uri = ngx.var.uri
    ngx.req.set_uri(prefix .. uri, false)
end

-- URI 版本映射
function _M.version_map(version_map)
    local uri = ngx.var.uri

    for pattern, replacement in pairs(version_map) do
        local new_uri = uri:gsub(pattern, replacement)
        if new_uri ~= uri then
            ngx.req.set_uri(new_uri, false)
            return
        end
    end
end

-- 查询参数添加
function _M.add_query_params(params)
    local args = ngx.req.get_uri_args()
    for k, v in pairs(params) do
        if not args[k] then
            args[k] = v
        end
    end
    ngx.req.set_uri_args(args)
end

return _M

10.2.2 请求头改写

-- /usr/local/openresty/lua/transform/header_transform.lua
local _M = {}

-- 添加标准网关头
function _M.add_gateway_headers()
    ngx.req.set_header("X-Forwarded-For", ngx.var.remote_addr)
    ngx.req.set_header("X-Forwarded-Proto", ngx.var.scheme)
    ngx.req.set_header("X-Forwarded-Host", ngx.var.host)
    ngx.req.set_header("X-Request-ID", ngx.var.request_id or ngx.var.request_time)
    ngx.req.set_header("X-Gateway-Time", ngx.now())
end

-- 移除敏感头
function _M.remove_sensitive_headers()
    ngx.req.clear_header("Cookie")
    ngx.req.clear_header("X-Internal-Token")
end

-- 头映射(将自定义头转为标准头)
function _M.map_headers(mapping)
    local headers = ngx.req.get_headers()
    for source, target in pairs(mapping) do
        local value = headers[source]
        if value then
            ngx.req.set_header(target, value)
            ngx.req.clear_header(source)
        end
    end
end

return _M

10.2.3 请求体改写

-- /usr/local/openresty/lua/transform/body_transform.lua
local _M = {}

local cjson = require "cjson"

-- JSON 请求体转换
function _M.transform_json(transform_func)
    ngx.req.read_body()
    local body = ngx.req.get_body_data()

    if not body or body == "" then
        return nil, "empty body"
    end

    local data, err = cjson.decode(body)
    if not data then
        return nil, "invalid JSON: " .. err
    end

    -- 应用转换函数
    local transformed = transform_func(data)

    ngx.req.set_body_data(cjson.encode(transformed))
    return transformed
end

-- 添加默认字段
function _M.add_defaults(defaults)
    return function(data)
        for k, v in pairs(defaults) do
            if data[k] == nil then
                data[k] = v
            end
        end
        return data
    end
end

-- 字段重命名
function _M.rename_fields(mapping)
    return function(data)
        for old_name, new_name in pairs(mapping) do
            if data[old_name] ~= nil then
                data[new_name] = data[old_name]
                data[old_name] = nil
            end
        end
        return data
    end
end

-- 数据验证与过滤
function _M.filter_fields(allowed_fields)
    return function(data)
        local filtered = {}
        for _, field in ipairs(allowed_fields) do
            if data[field] ~= nil then
                filtered[field] = data[field]
            end
        end
        return filtered
    end
end

return _M

10.3 响应改写

10.3.1 响应头改写

-- /usr/local/openresty/lua/transform/response_transform.lua
local _M = {}

-- 在 header_filter 阶段执行
function _M.transform_headers()
    -- 移除后端服务器信息
    ngx.header["Server"] = "Gateway"
    ngx.header["X-Powered-By"] = nil

    -- 添加安全头
    ngx.header["X-Content-Type-Options"] = "nosniff"
    ngx.header["X-Frame-Options"] = "DENY"
    ngx.header["Strict-Transport-Security"] = "max-age=31536000"

    -- 添加追踪头
    ngx.header["X-Request-ID"] = ngx.var.request_id or ""
    ngx.header["X-Response-Time"] = ngx.var.request_time
end

-- CORS 头设置
function _M.set_cors(options)
    options = options or {}
    local origin = options.allow_origin or "*"
    local methods = options.allow_methods or "GET, POST, PUT, DELETE, OPTIONS"
    local headers = options.allow_headers or "Content-Type, Authorization, X-Request-ID"
    local max_age = options.max_age or 86400

    ngx.header["Access-Control-Allow-Origin"] = origin
    ngx.header["Access-Control-Allow-Methods"] = methods
    ngx.header["Access-Control-Allow-Headers"] = headers
    ngx.header["Access-Control-Max-Age"] = max_age

    -- OPTIONS 请求直接返回
    if ngx.req.get_method() == "OPTIONS" then
        ngx.status = 204
        return ngx.exit(204)
    end
end

return _M

10.3.2 响应体改写

-- body_filter 阶段改写响应体
local function body_transform()
    local chunk = ngx.arg[1]
    local eof = ngx.arg[2]

    if not eof and not chunk then
        return
    end

    -- 统一响应格式包装
    -- 注意:分块传输时,只能在最后一块做完整处理
    -- 这里演示简单的文本替换
    if chunk then
        -- 移除敏感字段
        chunk = chunk:gsub('"password"%s*:%s*"[^"]*"', '"password":"***"')
        chunk = chunk:gsub('"token"%s*:%s*"[^"]*"', '"token":"***"')
        ngx.arg[1] = chunk
    end
end

-- 更完整的响应包装(需要缓冲整个响应)
local function wrap_response()
    local chunks = {}
    local eof = false

    return function(chunk_arg, eof_arg)
        if eof_arg then
            eof = true
        end

        if chunk_arg then
            table.insert(chunks, chunk_arg)
        end

        if eof then
            local body = table.concat(chunks)
            local cjson = require "cjson"

            -- 尝试解析并包装 JSON
            local ok, data = pcall(cjson.decode, body)
            if ok then
                local wrapped = {
                    code = ngx.status,
                    message = "success",
                    data = data,
                    timestamp = ngx.time(),
                    request_id = ngx.var.request_id or "",
                }
                ngx.arg[1] = cjson.encode(wrapped)
            else
                ngx.arg[1] = body
            end

            ngx.arg[2] = true
        end
    end
end

10.4 JSON 与 XML 转换

-- /usr/local/openresty/lua/transform/json_xml.lua
local _M = {}

-- JSON 转 XML
function _M.json_to_xml(json_str, root_name)
    local cjson = require "cjson"
    local data = cjson.decode(json_str)
    root_name = root_name or "root"

    local function to_xml(obj, name)
        local parts = {}

        if type(obj) == "table" then
            if #obj > 0 then
                -- 数组
                for _, item in ipairs(obj) do
                    table.insert(parts, to_xml(item, name))
                end
            else
                -- 对象
                table.insert(parts, "<" .. name .. ">")
                for k, v in pairs(obj) do
                    table.insert(parts, to_xml(v, k))
                end
                table.insert(parts, "</" .. name .. ">")
            end
        elseif type(obj) == "string" then
            table.insert(parts, "<" .. name .. ">" ..
                obj:gsub("&", "&amp;"):gsub("<", "&lt;"):gsub(">", "&gt;") ..
                "</" .. name .. ">")
        else
            table.insert(parts, "<" .. name .. ">" .. tostring(obj) .. "</" .. name .. ">")
        end

        return table.concat(parts)
    end

    return '<?xml version="1.0" encoding="UTF-8"?>\n' .. to_xml(data, root_name)
end

-- XML 转 JSON(简化版)
function _M.xml_to_json(xml_str)
    -- 简单的 XML 解析(生产环境建议使用 lua-xml)
    local result = {}
    -- 提取标签和内容
    for tag, content in xml_str:gmatch("<(%w+)>(.-)</%1>") do
        if content:match("<") then
            -- 嵌套标签,递归解析
            result[tag] = _M.xml_to_json(content)
        else
            -- 叶子节点
            local num = tonumber(content)
            if num then
                result[tag] = num
            elseif content == "true" then
                result[tag] = true
            elseif content == "false" then
                result[tag] = false
            else
                result[tag] = content
            end
        end
    end
    return result
end

return _M

10.5 gRPC-HTTP 转换

-- /usr/local/openresty/lua/transform/grpc_gateway.lua
local _M = {}

local cjson = require "cjson"

-- HTTP REST 转 gRPC 请求
function _M.http_to_grpc(path, method, body)
    -- 解析 gRPC 服务和方法
    local service, rpc_method = path:match("^/([^/]+)/([^/]+)$")
    if not service or not rpc_method then
        return nil, "Invalid gRPC path: " .. path
    end

    -- JSON 转 Protobuf(简化处理,实际需要 protobuf 序列化)
    local grpc_request = {
        service = service,
        method = rpc_method,
        message = body,
    }

    return grpc_request
end

-- gRPC 响应转 HTTP JSON
function _M.grpc_to_http(grpc_response)
    local http_response = {
        status = grpc_response.status or 200,
        body = cjson.encode(grpc_response.message or {}),
    }
    return http_response
end

-- gRPC 状态码映射到 HTTP 状态码
local grpc_to_http_status = {
    [0] = 200,   -- OK
    [1] = 499,   -- CANCELLED
    [2] = 500,   -- UNKNOWN
    [3] = 400,   -- INVALID_ARGUMENT
    [4] = 504,   -- DEADLINE_EXCEEDED
    [5] = 404,   -- NOT_FOUND
    [6] = 409,   -- ALREADY_EXISTS
    [7] = 403,   -- PERMISSION_DENIED
    [8] = 429,   -- RESOURCE_EXHAUSTED
    [9] = 400,   -- FAILED_PRECONDITION
    [10] = 409,  -- ABORTED
    [11] = 400,  -- OUT_OF_RANGE
    [12] = 501,  -- UNIMPLEMENTED
    [13] = 500,  -- INTERNAL
    [14] = 503,  -- UNAVAILABLE
    [15] = 500,  -- DATA_LOSS
    [16] = 401,  -- UNAUTHENTICATED
}

function _M.map_grpc_status(grpc_status)
    return grpc_to_http_status[grpc_status] or 500
end

return _M

10.6 请求聚合(API Composition)

将多个后端服务的响应聚合为一个响应。

-- /usr/local/openresty/lua/transform/aggregator.lua
local _M = {}

local http = require "resty.http"
local cjson = require "cjson"

-- 并发请求多个后端
function _M.aggregate(requests)
    local results = {}
    local threads = {}

    -- 使用 ngx.thread 并发请求
    for i, req in ipairs(requests) do
        threads[i] = ngx.thread.spawn(function()
            local httpc = http.new()
            httpc:set_timeout(req.timeout or 5000)

            local res, err = httpc:request_uri(req.url, {
                method = req.method or "GET",
                body = req.body,
                headers = req.headers or {},
            })

            return {
                name = req.name,
                status = res and res.status or 0,
                body = res and res.body or nil,
                error = err,
            }
        end)
    end

    -- 收集结果
    for i, thread in ipairs(threads) do
        local ok, result = ngx.thread.wait(thread)
        if ok then
            results[requests[i].name] = result
        else
            results[requests[i].name] = {
                status = 0,
                error = "thread failed",
            }
        end
    end

    return results
end

-- 用户详情聚合示例
function _M.get_user_detail(user_id)
    local results = _M.aggregate({
        {
            name = "profile",
            url = "http://user-service:8081/users/" .. user_id,
        },
        {
            name = "orders",
            url = "http://order-service:8082/users/" .. user_id .. "/orders?limit=5",
        },
        {
            name = "preferences",
            url = "http://config-service:8083/users/" .. user_id .. "/preferences",
        },
    })

    -- 聚合响应
    local response = {
        user_id = user_id,
        profile = results.profile and cjson.decode(results.profile.body) or nil,
        recent_orders = results.orders and cjson.decode(results.orders.body) or {},
        preferences = results.preferences and cjson.decode(results.preferences.body) or {},
    }

    return response
end

return _M

nginx 配置:

location /api/users/(\d+)/detail {
    content_by_lua_block {
        local cjson = require "cjson"
        local aggregator = require "transform.aggregator"

        local user_id = ngx.var[1]
        local result = aggregator.get_user_detail(user_id)

        ngx.header["Content-Type"] = "application/json"
        ngx.say(cjson.encode(result))
    }
}

10.7 流式响应处理

-- 流式代理(不缓冲响应体)
local function stream_proxy(upstream_url)
    local httpc = http.new()
    httpc:set_timeout(60000)  -- 长连接超时

    local res, err = httpc:request_uri(upstream_url, {
        method = ngx.req.get_method(),
        headers = ngx.req.get_headers(),
    })

    if not res then
        ngx.status = 502
        return
    end

    ngx.status = res.status

    -- 设置响应头
    for k, v in pairs(res.headers) do
        if k ~= "transfer-encoding" then
            ngx.header[k] = v
        end
    end

    -- 流式输出
    local reader = res.body_reader
    while true do
        local chunk, err = reader(8192)
        if not chunk then
            break
        end
        ngx.print(chunk)
        ngx.flush(true)
    end
end

10.8 注意事项

body_filter 限制:修改响应体会阻止 Nginx 的 sendfile 优化,大响应建议使用 proxy_buffering off 流式传输。

内存占用:缓冲整个请求/响应体会消耗大量内存,大文件上传/下载场景应使用流式处理。

编码问题:转换 JSON/XML 时注意字符编码,中文等多字节字符在字节截断时可能损坏。


上一章← 第 09 章 - 缓存策略 下一章第 11 章 - 日志与监控 →