强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

14 - 安全

14 - 安全:加密、权限与 SQL 注入防护

14.1 SQLite 安全概述

安全领域SQLite 默认支持需要额外方案
文件级权限✅(依赖 OS 文件权限)
数据加密SQLCipher / SEE
用户认证应用层实现
网络安全❌(无网络访问)
SQL 注入防护✅(参数化查询)
审计日志应用层实现

14.2 文件级安全

14.2.1 OS 文件权限

# 设置数据库文件权限(仅 owner 可读写)
chmod 600 mydb.db

# 设置目录权限
chmod 700 /var/data/

# 检查权限
ls -la mydb.db
# -rw------- 1 user user 40960 May 10 12:00 mydb.db

14.2.2 临时文件安全

-- 设置临时文件目录(避免敏感数据泄露到临时目录)
PRAGMA temp_store = MEMORY;  -- 临时表存内存(推荐)

-- 设置临时目录(C API)
-- sqlite3_temp_directory = "/secure/tmp/"

14.3 数据加密

14.3.1 SQLCipher

SQLCipher 是 SQLite 的开源加密扩展,提供透明的 256-bit AES 加密。

# 安装 SQLCipher
# macOS
brew install sqlcipher

# Ubuntu/Debian
sudo apt install sqlcipher libsqlcipher-dev

# 创建加密数据库
sqlcipher encrypted.db
sqlite> PRAGMA key = 'my-secret-passphrase';
sqlite> CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);
sqlite> INSERT INTO users VALUES (1, '张三');
sqlite> .quit

# 打开加密数据库
sqlcipher encrypted.db
sqlite> PRAGMA key = 'my-secret-passphrase';
sqlite> SELECT * FROM users;

# 错误的密码
sqlcipher encrypted.db
sqlite> PRAGMA key = 'wrong-key';
sqlite> SELECT * FROM users;
-- Error: file is not a database

14.3.2 SQLCipher 加密配置

-- 设置加密密钥
PRAGMA key = 'my-secret-passphrase';

-- 修改密钥
PRAGMA rekey = 'new-passphrase';

-- 使用二进制密钥(256-bit)
PRAGMA key = "x'hex_encoded_key'";

-- 加密配置
PRAGMA cipher_page_size = 4096;        -- 加密页面大小
PRAGMA kdf_iter = 256000;               -- 密钥派生迭代次数
PRAGMA cipher_hmac_algorithm = HMAC_SHA512;  -- HMAC 算法
PRAGMA cipher_kdf_algorithm = PBKDF2_HMAC_SHA512;  -- KDF 算法

14.3.3 加密性能影响

方面影响
读取速度略慢(解密开销)
写入速度略慢(加密开销)
数据库大小略大(加密元数据)
内存使用略高(密钥和缓冲区)

14.3.4 SQLite SEE(商业加密)

-- SQLite Encryption Extension(商业产品)
PRAGMA key = 'my-license-key';

-- SEE 支持的加密算法
PRAGMA cipher = 'aes-256-cbc';
PRAGMA cipher = 'aes-256-ecb';
PRAGMA cipher = 'aes-128-cbc';

14.4 SQL 注入防护

14.4.1 什么是 SQL 注入

-- ❌ 危险的字符串拼接
-- 用户输入: "张三' OR '1'='1"
query = "SELECT * FROM users WHERE name = '" + user_input + "'";
-- 实际执行: SELECT * FROM users WHERE name = '张三' OR '1'='1'
-- 返回所有用户!

-- ❌ 更危险的输入: "'; DROP TABLE users; --"
query = "SELECT * FROM users WHERE name = '" + user_input + "'";
-- 实际执行: SELECT * FROM users WHERE name = ''; DROP TABLE users; --'

14.4.2 参数化查询(防止注入的唯一正确方式)

# Python 示例
import sqlite3

conn = sqlite3.connect('mydb.db')
cursor = conn.cursor()

# ✅ 正确:使用参数化查询
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
cursor.execute("SELECT * FROM users WHERE name = ? AND age = ?", (name, age))

# ✅ 使用命名参数
cursor.execute("SELECT * FROM users WHERE name = :name", {"name": user_input})

# ❌ 错误:字符串拼接
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")  # SQL 注入!
cursor.execute("SELECT * FROM users WHERE name = '" + user_input + "'")  # SQL 注入!
// Go 示例
// ✅ 正确
rows, err := db.Query("SELECT * FROM users WHERE name = ?", name)

// ❌ 错误
rows, err := db.Query("SELECT * FROM users WHERE name = '" + name + "'")
// Node.js (better-sqlite3) 示例
// ✅ 正确
const stmt = db.prepare('SELECT * FROM users WHERE name = ?');
const user = stmt.get(userInput);

// ✅ 命名参数
const stmt = db.prepare('SELECT * FROM users WHERE name = @name');
const user = stmt.get({ name: userInput });

14.4.3 LIKE 注入

-- LIKE 中的特殊字符也需要转义
-- % 和 _ 是通配符

-- 用户输入: "张%"
-- 如果直接拼接: WHERE name LIKE '张%'  → 匹配所有以张开头的名字

-- ✅ 正确方式:转义特殊字符
-- Python:
cursor.execute(
    "SELECT * FROM users WHERE name LIKE ? ESCAPE '\\'",
    ('%' + user_input.replace('%', '\\%').replace('_', '\\_') + '%',)
)

14.4.4 表名/列名注入

-- ⚠️ 表名和列名不能参数化
-- ❌ 不工作
cursor.execute("SELECT * FROM ? WHERE id = ?", (table_name, id))

-- ✅ 白名单验证
ALLOWED_TABLES = {'users', 'orders', 'products'}
if table_name not in ALLOWED_TABLES:
    raise ValueError(f"非法表名: {table_name}")

query = f"SELECT * FROM {table_name} WHERE id = ?"
cursor.execute(query, (id,))

14.4.5 SQL 注入防护清单

#防护措施说明
1使用参数化查询所有用户输入必须参数化
2白名单验证表名、列名、排序方向使用白名单
3转义特殊字符LIKE 中的 %_
4最小权限原则应用只使用所需的最小权限
5错误信息不暴露细节不要返回 SQL 错误信息给用户
6输入长度限制限制输入字段的最大长度
7代码审查定期审查 SQL 拼接代码

14.5 应用层权限控制

SQLite 没有内置的用户权限系统,需要在应用层实现。

14.5.1 基于角色的访问控制(RBAC)

-- 用户表
CREATE TABLE app_users (
    id INTEGER PRIMARY KEY,
    username TEXT UNIQUE NOT NULL,
    password_hash TEXT NOT NULL,
    role TEXT NOT NULL DEFAULT 'user' CHECK(role IN ('admin', 'editor', 'user'))
);

-- 权限表
CREATE TABLE permissions (
    id INTEGER PRIMARY KEY,
    role TEXT NOT NULL,
    resource TEXT NOT NULL,
    action TEXT NOT NULL CHECK(action IN ('read', 'write', 'delete')),
    UNIQUE(role, resource, action)
);

INSERT INTO permissions (role, resource, action) VALUES
    ('admin', 'users', 'read'),
    ('admin', 'users', 'write'),
    ('admin', 'users', 'delete'),
    ('editor', 'articles', 'read'),
    ('editor', 'articles', 'write'),
    ('user', 'articles', 'read');

-- 检查权限
SELECT COUNT(*) FROM permissions
WHERE role = ? AND resource = ? AND action = ?;

14.5.2 数据行级权限

-- 每行数据关联创建者
CREATE TABLE documents (
    id INTEGER PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT,
    owner_id INTEGER NOT NULL,
    is_public INTEGER DEFAULT 0,
    FOREIGN KEY (owner_id) REFERENCES app_users(id)
);

-- 查询时过滤
-- 用户只能看到自己的文档或公开文档
SELECT * FROM documents
WHERE owner_id = ? OR is_public = 1;

14.6 数据脱敏

-- 创建脱敏视图
CREATE VIEW users_safe AS
SELECT
    id,
    CASE WHEN is_admin THEN name ELSE '***' END AS name,
    CASE WHEN is_admin THEN email ELSE substr(email, 1, 3) || '***' END AS email,
    '***-****-' || substr(phone, -4) AS phone_masked
FROM users;

-- 使用 hex 编码掩码
SELECT
    id,
    hex(randomblob(4)) AS anonymized_id,
    'user_' || hex(randomblob(4)) AS anonymized_name
FROM users;

14.7 日志审计

-- 创建审计日志表
CREATE TABLE audit_log (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    table_name TEXT NOT NULL,
    record_id INTEGER,
    action TEXT NOT NULL CHECK(action IN ('INSERT', 'UPDATE', 'DELETE')),
    old_values TEXT,  -- JSON
    new_values TEXT,  -- JSON
    user_id INTEGER,
    ip_address TEXT,
    created_at TEXT DEFAULT (datetime('now'))
);

-- 自动审计触发器
CREATE TRIGGER audit_users_update AFTER UPDATE ON users
BEGIN
    INSERT INTO audit_log (table_name, record_id, action, old_values, new_values)
    VALUES (
        'users',
        OLD.id,
        'UPDATE',
        json_object('name', OLD.name, 'email', OLD.email),
        json_object('name', NEW.name, 'email', NEW.email)
    );
END;

CREATE TRIGGER audit_users_delete AFTER DELETE ON users
BEGIN
    INSERT INTO audit_log (table_name, record_id, action, old_values)
    VALUES (
        'users',
        OLD.id,
        'DELETE',
        json_object('name', OLD.name, 'email', OLD.email)
    );
END;

14.8 安全配置建议

-- 生产环境安全配置
PRAGMA foreign_keys = ON;          -- 外键约束
PRAGMA journal_mode = WAL;         -- WAL 日志模式
PRAGMA synchronous = NORMAL;       -- 同步级别
PRAGMA temp_store = MEMORY;        -- 临时表存内存(避免磁盘泄露)
PRAGMA secure_delete = ON;         -- 删除时覆盖数据(性能下降)
PRAGMA cell_size_check = ON;       -- 检查单元格大小

14.8.1 secure_delete

-- 开启安全删除(删除数据时用零覆盖)
PRAGMA secure_delete = ON;

-- 删除后的数据无法恢复
DELETE FROM sensitive_data WHERE id = 1;
-- 数据页面被零覆盖

⚠️ 注意事项

  1. 参数化查询是防止 SQL 注入的唯一可靠方式——不要依赖转义或过滤
  2. SQLite 没有内置用户认证——安全必须在应用层实现
  3. 加密数据库也需要文件权限保护——防止暴力破解
  4. WAL 文件可能包含未加密的数据——SQLCipher 需要正确配置
  5. secure_delete = ON 会降低性能——仅在处理敏感数据时启用
  6. 不要在日志中记录 SQL 参数——可能泄露敏感数据

💡 技巧

  1. 永远使用参数化查询——没有例外
  2. SQLCipher 是开源免费的——适合大多数加密需求
  3. 使用 temp_store = MEMORY——避免临时文件泄露敏感数据
  4. 审计日志使用 JSON 存储变更——方便后续分析
  5. 应用层权限比数据库权限更灵活——适合 Web 应用

📌 业务场景

场景一:Web 应用安全

import sqlite3

def safe_query(db_path, user_input):
    conn = sqlite3.connect(db_path)
    # ✅ 参数化查询
    cursor = conn.execute(
        "SELECT * FROM users WHERE email = ? AND is_active = 1",
        (user_input,)
    )
    return cursor.fetchall()

场景二:存储敏感数据

-- 使用 SQLCipher 存储用户凭据
-- sqlcipher secure.db
PRAGMA key = 'application-secret-key';

CREATE TABLE credentials (
    id INTEGER PRIMARY KEY,
    service TEXT NOT NULL,
    username TEXT NOT NULL,
    password_encrypted BLOB NOT NULL,
    created_at TEXT DEFAULT (datetime('now'))
);

🔗 扩展阅读


📖 下一章15 - 扩展 —— 加载扩展、自定义函数、自定义聚合