强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

14 - 安全加固

第 14 章 · 安全加固

安全是生产环境的生命线。本章覆盖权限系统、行级安全策略(RLS)、加密、审计和 SQL 注入防护。


14.1 角色与权限

角色管理

-- 创建角色(角色 = 用户 + 组)
CREATE ROLE appuser WITH LOGIN PASSWORD 'securepassword';
CREATE ROLE readonly WITH LOGIN PASSWORD 'readpass';
CREATE ROLE admin WITH LOGIN PASSWORD 'adminpass' SUPERUSER;

-- 组角色
CREATE ROLE developers;
GRANT developers TO appuser;

-- 授予权限
GRANT CONNECT ON DATABASE mydb TO appuser;
GRANT USAGE ON SCHEMA public TO appuser;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO appuser;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO appuser;

-- 只读角色
GRANT CONNECT ON DATABASE mydb TO readonly;
GRANT USAGE ON SCHEMA public TO readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly;

-- 默认权限(新创建的对象自动授权)
ALTER DEFAULT PRIVILEGES IN SCHEMA public
    GRANT SELECT ON TABLES TO readonly;

ALTER DEFAULT PRIVILEGES IN SCHEMA public
    GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO appuser;

ALTER DEFAULT PRIVILEGES IN SCHEMA public
    GRANT USAGE ON SEQUENCES TO appuser;

-- 撤销权限
REVOKE DELETE ON employees FROM appuser;

-- 查看权限
\du                              -- 列出角色
\dp employees                    -- 查看表权限
SELECT * FROM information_schema.role_table_grants WHERE grantee = 'appuser';

权限层次

SUPERUSER > CREATEDB > CREATEROLE > REPLICATION > LOGIN > 连接权限 > Schema 权限 > 对象权限

14.2 行级安全策略(RLS)

-- 启用 RLS
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;

-- 创建策略:用户只能看到自己的订单
CREATE POLICY user_orders ON orders
    FOR ALL
    TO appuser
    USING (user_id = current_setting('app.user_id')::INT);

-- 设置会话变量
SET app.user_id = '42';
SELECT * FROM orders;  -- 只返回 user_id=42 的行

-- 租户隔离策略
CREATE POLICY tenant_isolation ON orders
    FOR ALL
    TO appuser
    USING (tenant_id = current_setting('app.tenant_id')::INT);

-- 管理员绕过 RLS
ALTER TABLE orders FORCE ROW LEVEL SECURITY;
-- 即使表所有者也受 RLS 限制

-- 使用 WITH CHECK 控制写入
CREATE POLICY insert_policy ON orders
    FOR INSERT
    TO appuser
    WITH CHECK (tenant_id = current_setting('app.tenant_id')::INT);

14.3 SSL/TLS 加密

# postgresql.conf
ssl = on
ssl_cert_file = '/etc/ssl/server.crt'
ssl_key_file = '/etc/ssl/server.key'
ssl_ca_file = '/etc/ssl/ca.crt'
ssl_crl_file = '/etc/ssl/root.crl'
# pg_hba.conf — 强制 SSL 连接
hostssl    all    all    0.0.0.0/0    scram-sha-256
# 生成自签名证书
openssl req -new -x509 -days 365 -nodes \
    -out server.crt -keyout server.key \
    -subj "/CN=postgres-server"

# 连接时使用 SSL
psql "host=myserver dbname=mydb sslmode=require"
sslmode说明
disable不使用 SSL
allow尝试 SSL,失败则降级
prefer优先 SSL(默认)
require必须 SSL
verify-ca验证 CA
verify-full验证 CA + 主机名

14.4 列级加密

-- 使用 pgcrypto 扩展
CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- 对称加密(AES)
INSERT INTO users (name, ssn)
VALUES ('Alice', pgp_sym_encrypt('123-45-6789', 'secret_key'));

-- 解密
SELECT name, pgp_sym_decrypt(ssn::bytea, 'secret_key') AS ssn FROM users;

-- 哈希(不可逆)
INSERT INTO users (name, password_hash)
VALUES ('Bob', crypt('mypassword', gen_salt('bf')));

-- 验证密码
SELECT * FROM users
WHERE password_hash = crypt('mypassword', password_hash);

14.5 审计

-- 使用 pgAudit 扩展
-- postgresql.conf
-- shared_preload_libraries = 'pgaudit'
-- pgaudit.log = 'write, ddl, role'

-- 简单审计日志(触发器方式)
CREATE TABLE audit_trail (
    id BIGSERIAL PRIMARY KEY,
    table_name TEXT NOT NULL,
    action TEXT NOT NULL,
    row_data JSONB,
    changed_by TEXT DEFAULT current_user,
    changed_at TIMESTAMPTZ DEFAULT NOW(),
    client_addr INET DEFAULT inet_client_addr()
);

CREATE OR REPLACE FUNCTION log_changes() RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO audit_trail (table_name, action, row_data)
    VALUES (
        TG_TABLE_NAME,
        TG_OP,
        CASE WHEN TG_OP = 'DELETE' THEN to_jsonb(OLD) ELSE to_jsonb(NEW) END
    );
    RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

-- 为敏感表添加审计触发器
CREATE TRIGGER audit_employees
    AFTER INSERT OR UPDATE OR DELETE ON employees
    FOR EACH ROW EXECUTE FUNCTION log_changes();

14.6 SQL 注入防护

-- ❌ 字符串拼接(SQL 注入风险!)
-- query := 'SELECT * FROM users WHERE name = ''' || user_input || '''';

-- ✅ 使用 format() + %I(标识符)和 %L(字面量)
query := format('SELECT * FROM %I WHERE name = %L', table_name, user_input);

-- ✅ 使用 EXECUTE ... USING(参数化)
EXECUTE 'SELECT * FROM users WHERE name = $1' USING user_input;

-- ✅ 使用 ORM 或驱动的参数化查询
-- Python psycopg2: cursor.execute("SELECT * FROM users WHERE name = %s", (name,))

业务场景

场景推荐方案
多租户 SaaSRLS + 会话变量
金融数据列级加密 + 审计日志
合规要求pgAudit + SSL + 访问控制
API 安全最小权限角色 + 参数化查询

扩展阅读