19 - 最佳实践
19 - 最佳实践:Schema 设计、并发策略与何时不用 SQLite
19.1 Schema 设计最佳实践
19.1.1 主键设计
-- ✅ 最佳:INTEGER PRIMARY KEY(等价于 rowid,性能最优)
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
);
-- ⚠️ 可以但有额外开销:AUTOINCREMENT
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL
);
-- AUTOINCREMENT 使用 sqlite_sequence 表记录最大 ID
-- 比不带 AUTOINCREMENT 稍慢
-- ❌ 不推荐:使用其他类型做主键
CREATE TABLE users (
id TEXT PRIMARY KEY, -- UUID 做主键性能差
name TEXT NOT NULL
);
-- ✅ 如果必须用 UUID:作为额外列 + INTEGER 主键
CREATE TABLE users (
id INTEGER PRIMARY KEY,
uid TEXT NOT NULL UNIQUE DEFAULT (lower(hex(randomblob(16)))),
name TEXT NOT NULL
);
CREATE INDEX idx_users_uid ON users(uid);
主键选择对比:
| 主键类型 | 插入性能 | 查询性能 | 索引大小 | 适用场景 |
|---|
INTEGER PRIMARY KEY | ⭐⭐⭐ 最快 | ⭐⭐⭐ 最快 | 最小 | 默认首选 |
INTEGER PRIMARY KEY AUTOINCREMENT | ⭐⭐ 较快 | ⭐⭐⭐ | 小 | 需要严格递增 |
| UUID (TEXT) | ⭐ 慢 | ⭐⭐ | 大 | 分布式系统 |
| UUID + INTEGER | ⭐⭐ | ⭐⭐⭐ | 中 | 需要 UUID 但在意性能 |
19.1.2 外键设计
-- ✅ 开启外键约束
PRAGMA foreign_keys = ON;
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
amount REAL NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
-- ✅ 外键列创建索引(加速 JOIN 和约束检查)
CREATE INDEX idx_orders_user_id ON orders(user_id);
-- ON DELETE/ON UPDATE 选项:
-- CASCADE — 级联删除/更新
-- SET NULL — 设为 NULL
-- SET DEFAULT — 设为默认值
-- RESTRICT — 阻止操作(默认行为)
-- NO ACTION — 类似 RESTRICT
19.1.3 时间字段设计
-- 方案 1:TEXT ISO 8601(推荐,可读性好)
CREATE TABLE events (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
-- 方案 2:INTEGER Unix 时间戳(紧凑,计算快)
CREATE TABLE events (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
);
-- 方案 3:同时存储两者(灵活查询)
CREATE TABLE events (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
created_ts INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
);
19.1.4 枚举值设计
-- ✅ TEXT + CHECK 约束(推荐)
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'pending'
CHECK(status IN ('pending', 'paid', 'shipped', 'completed', 'cancelled'))
);
-- ✅ 外键引用枚举表(更灵活)
CREATE TABLE order_statuses (
code TEXT PRIMARY KEY,
label TEXT NOT NULL,
sort_order INTEGER NOT NULL
);
INSERT INTO order_statuses VALUES
('pending', '待处理', 1),
('paid', '已付款', 2),
('shipped', '已发货', 3),
('completed', '已完成', 4),
('cancelled', '已取消', 5);
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'pending',
FOREIGN KEY (status) REFERENCES order_statuses(code)
);
19.1.5 金额存储
-- ✅ INTEGER 存储分为单位(推荐)
CREATE TABLE transactions (
id INTEGER PRIMARY KEY,
amount_cents INTEGER NOT NULL, -- 19.99 元存储为 1999
currency TEXT DEFAULT 'CNY'
);
-- 查询时:SELECT amount_cents / 100.0 AS amount FROM transactions;
-- ❌ REAL 浮点数(精度问题)
CREATE TABLE transactions (
id INTEGER PRIMARY KEY,
amount REAL NOT NULL -- 0.1 + 0.2 ≠ 0.3
);
19.1.6 软删除设计
-- 软删除:标记删除而非物理删除
CREATE TABLE records (
id INTEGER PRIMARY KEY,
data TEXT NOT NULL,
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TEXT
);
-- 查询时过滤
SELECT * FROM records WHERE is_deleted = 0;
-- 删除时标记
UPDATE records SET is_deleted = 1, deleted_at = datetime('now') WHERE id = 1;
-- 部分索引:只为未删除的数据创建索引
CREATE INDEX idx_records_active ON records(data) WHERE is_deleted = 0;
-- 定期清理已删除数据
DELETE FROM records WHERE is_deleted = 1 AND deleted_at < datetime('now', '-30 days');
19.1.7 多租户设计
-- 方案 1:共享表 + tenant_id(推荐小规模)
CREATE TABLE products (
id INTEGER PRIMARY KEY,
tenant_id TEXT NOT NULL,
name TEXT NOT NULL,
price INTEGER NOT NULL
);
CREATE INDEX idx_products_tenant ON products(tenant_id);
-- 所有查询都带 tenant_id
SELECT * FROM products WHERE tenant_id = ? AND name LIKE ?;
-- 方案 2:每租户独立数据库(大规模隔离)
ATTACH DATABASE 'tenant_abc.db' AS tenant;
SELECT * FROM tenant.products;
19.2 并发策略
19.2.1 读写并发
WAL 模式下的并发模型:
┌───────────────────────────────────────┐
│ 多个读连接(并发) │
│ Reader A ←→ Reader B ←→ Reader C │
│ ↓ WAL ↓ │
│ ┌──────────────┐ │
│ │ 写连接(单个)│ │
│ │ Writer │ │
│ └──────────────┘ │
└───────────────────────────────────────┘
# Python 读写分离示例
import sqlite3
import threading
class SQLiteManager:
def __init__(self, db_path):
self.db_path = db_path
self._write_lock = threading.Lock()
def _get_read_conn(self):
"""获取只读连接(可多线程并发)"""
conn = sqlite3.connect(f'file:{self.db_path}?mode=ro', uri=True)
conn.execute('PRAGMA journal_mode = WAL')
conn.row_factory = sqlite3.Row
return conn
def _get_write_conn(self):
"""获取写连接(需串行化)"""
conn = sqlite3.connect(self.db_path)
conn.execute('PRAGMA journal_mode = WAL')
conn.execute('PRAGMA foreign_keys = ON')
conn.execute('PRAGMA busy_timeout = 5000')
conn.row_factory = sqlite3.Row
return conn
def read(self, query, params=()):
with self._get_read_conn() as conn:
return conn.execute(query, params).fetchall()
def write(self, query, params=()):
with self._write_lock:
with self._get_write_conn() as conn:
result = conn.execute(query, params)
conn.commit()
return result
19.2.2 处理 SQLITE_BUSY
import sqlite3
import time
def execute_with_retry(conn, query, params=(), max_retries=5):
"""带重试的 SQL 执行"""
for attempt in range(max_retries):
try:
return conn.execute(query, params)
except sqlite3.OperationalError as e:
if 'database is locked' in str(e) and attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1)) # 递增等待
continue
raise
# 设置 busy_timeout(更简单的方案)
conn = sqlite3.connect('mydb.db', timeout=10) # 10 秒超时
conn.execute('PRAGMA busy_timeout = 10000')
19.2.3 高并发写入策略
import sqlite3
from queue import Queue
from threading import Thread
class WriteWorker:
"""单线程写入 worker(避免锁竞争)"""
def __init__(self, db_path):
self.db_path = db_path
self.queue = Queue()
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.execute('PRAGMA journal_mode = WAL')
self._start_worker()
def _start_worker(self):
def worker():
while True:
task = self.queue.get()
if task is None:
break
query, params, callback = task
try:
result = self.conn.execute(query, params)
self.conn.commit()
if callback:
callback(result)
except Exception as e:
if callback:
callback(e)
finally:
self.queue.task_done()
self.thread = Thread(target=worker, daemon=True)
self.thread.start()
def execute_async(self, query, params=(), callback=None):
"""异步执行写操作"""
self.queue.put((query, params, callback))
def close(self):
self.queue.put(None)
self.thread.join()
self.conn.close()
19.3 何时不用 SQLite
19.3.1 不适合的场景
| 场景 | 原因 | 推荐方案 |
|---|
| 高并发写入(>100 QPS) | 写锁粒度为整个数据库 | PostgreSQL、MySQL |
| 多服务器共享数据 | 无法通过网络访问 | PostgreSQL、MySQL |
| 需要用户权限管理 | 无内置用户系统 | PostgreSQL |
| 超大规模数据(>TB) | 单文件存储限制 | 分布式数据库 |
| 复杂分析查询 | 缺少高级优化器 | ClickHouse、DuckDB |
| 全文搜索(高并发) | FTS5 不适合高并发 | Elasticsearch |
| 实时数据流 | 无流式处理 | Redis、Kafka |
19.3.2 适合的场景
| 场景 | 理由 |
|---|
| 嵌入式/移动应用 | 零依赖、零配置 |
| 桌面应用本地存储 | 单一文件、便携 |
| 单用户 Web 应用 | 中小流量足够 |
| 开发/测试环境 | 无需安装数据库服务 |
| 数据分析探索 | 直接查询文件 |
| 缓存层 | 替代 Redis 作为本地缓存 |
| 日志/审计 | 写入频率可控 |
| 配置存储 | 结构化配置数据 |
| IoT/边缘计算 | 资源受限环境 |
19.3.3 决策流程图
需要数据库
│
├── 多服务器共享数据? ── 是 ──→ PostgreSQL/MySQL
│
├── 并发写入 > 100 QPS? ── 是 ──→ PostgreSQL/MySQL
│
├── 数据量 > 100GB? ── 是 ──→ 分布式数据库
│
├── 需要网络访问? ── 是 ──→ PostgreSQL/MySQL
│
├── 需要用户权限管理? ── 是 ──→ PostgreSQL/MySQL
│
└── 否 ──→ SQLite ✅
19.4 错误处理
19.4.1 常见错误
| 错误码 | 含义 | 处理方式 |
|---|
SQLITE_BUSY | 数据库被锁定 | 重试或增加 busy_timeout |
SQLITE_CONSTRAINT | 约束违反 | 检查数据或使用 ON CONFLICT |
SQLITE_CORRUPT | 数据库损坏 | 从备份恢复 |
SQLITE_FULL | 磁盘空间不足 | 清理空间或 VACUUM |
SQLITE_READONLY | 只读数据库 | 检查文件权限 |
SQLITE_IOERR | I/O 错误 | 检查文件系统 |
import sqlite3
def safe_execute(db_path, query, params=()):
try:
conn = sqlite3.connect(db_path, timeout=10)
conn.execute('PRAGMA busy_timeout = 10000')
result = conn.execute(query, params)
conn.commit()
return result
except sqlite3.OperationalError as e:
if 'database is locked' in str(e):
print('数据库繁忙,请稍后重试')
elif 'database disk image is malformed' in str(e):
print('数据库损坏,需要从备份恢复')
elif 'disk I/O error' in str(e):
print('磁盘 I/O 错误')
else:
raise
except sqlite3.IntegrityError as e:
if 'UNIQUE constraint failed' in str(e):
print('数据已存在')
elif 'FOREIGN KEY constraint failed' in str(e):
print('引用的数据不存在')
else:
raise
finally:
conn.close()
19.5 测试策略
19.5.1 使用内存数据库测试
import sqlite3
import pytest
@pytest.fixture
def db():
"""每个测试使用独立的内存数据库"""
conn = sqlite3.connect(':memory:')
conn.execute('PRAGMA foreign_keys = ON')
conn.executescript('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
amount REAL NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
);
''')
yield conn
conn.close()
def test_create_user(db):
db.execute("INSERT INTO users (name, email) VALUES ('张三', '[email protected]')")
user = db.execute("SELECT * FROM users WHERE email = '[email protected]'").fetchone()
assert user[1] == '张三'
def test_unique_email(db):
db.execute("INSERT INTO users (name, email) VALUES ('张三', '[email protected]')")
with pytest.raises(sqlite3.IntegrityError):
db.execute("INSERT INTO users (name, email) VALUES ('李四', '[email protected]')")
19.6 代码规范
19.6.1 SQL 命名规范
| 元素 | 规范 | 示例 |
|---|
| 表名 | 小写复数,下划线分隔 | users, order_items |
| 列名 | 小写,下划线分隔 | created_at, user_id |
| 索引 | idx_表名_列名 | idx_users_email |
| 主键 | id | id INTEGER PRIMARY KEY |
| 外键 | 表名_id(单数) | user_id, order_id |
| 时间字段 | 动作_at | created_at, updated_at |
| 布尔字段 | is_形容词 | is_active, is_deleted |
19.6.2 连接管理规范
# ✅ 使用上下文管理器
with sqlite3.connect('mydb.db') as conn:
conn.execute(...)
# ✅ 使用 try/finally
conn = sqlite3.connect('mydb.db')
try:
conn.execute(...)
conn.commit()
finally:
conn.close()
# ❌ 不要忘记关闭连接
conn = sqlite3.connect('mydb.db')
conn.execute(...) # 如果这里抛异常,连接不会关闭
conn.close()
19.6.3 PRAGMA 设置规范
def get_connection(db_path):
"""标准连接初始化"""
conn = sqlite3.connect(db_path, timeout=10)
conn.row_factory = sqlite3.Row
# 标准 PRAGMA 设置
pragmas = {
'journal_mode': 'WAL',
'foreign_keys': 'ON',
'busy_timeout': '10000',
'synchronous': 'NORMAL',
'temp_store': 'MEMORY',
'cache_size': '-8000', # 8MB
}
for key, value in pragmas.items():
conn.execute(f'PRAGMA {key} = {value}')
return conn
19.7 部署检查清单
| # | 检查项 | 说明 |
|---|
| 1 | WAL 模式已开启 | PRAGMA journal_mode = WAL |
| 2 | 外键已开启 | PRAGMA foreign_keys = ON |
| 3 | busy_timeout 已设置 | PRAGMA busy_timeout = 5000 |
| 4 | 数据库文件权限正确 | chmod 600 mydb.db |
| 5 | 备份脚本已配置 | 定时备份 + 验证 |
| 6 | 磁盘空间充足 | VACUUM 需要 2 倍空间 |
| 7 | 索引已创建 | 检查热查询的索引 |
| 8 | ANALYZE 已执行 | PRAGMA analyze |
| 9 | 数据完整性检查 | PRAGMA integrity_check |
| 10 | 日志模式正确 | WAL 文件正常 |
⚠️ 注意事项
- 不要在循环中打开关闭连接——连接建立有开销
- 不要忘记
PRAGMA foreign_keys = ON——默认是关闭的 - 不要将数据库放在 NFS/SMB 上——文件锁不可靠
- 不要在事务中执行长时间操作——会阻塞其他写操作
- 不要使用
SELECT *——只查询需要的列 - 不要忽略
SQLITE_BUSY 错误——需要合理的重试机制
💡 技巧
- INTEGER PRIMARY KEY 是最佳主键选择——性能最优
- 外键列一定要创建索引——加速 JOIN 和约束检查
- 使用 CHECK 约束而非应用层验证——数据完整性更可靠
- 使用
WITHOUT ROWID 优化复合主键的小型表 - 定期
PRAGMA optimize 保持优化器统计信息最新
📌 业务场景
场景一:项目启动规范
# 项目初始化时的标准设置
def init_database(db_path):
conn = sqlite3.connect(db_path)
conn.executescript('''
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
PRAGMA busy_timeout = 5000;
PRAGMA synchronous = NORMAL;
''')
conn.executescript('''
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT DEFAULT (datetime('now'))
);
''')
conn.commit()
return conn
场景二:数据库迁移管理
class Migration:
def __init__(self, conn):
self.conn = conn
self._ensure_migrations_table()
def _ensure_migrations_table(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
)
''')
def applied(self, version):
row = self.conn.execute(
'SELECT 1 FROM schema_migrations WHERE version = ?', (version,)
).fetchone()
return row is not None
def apply(self, version, sql):
if self.applied(version):
return
self.conn.executescript(sql)
self.conn.execute('INSERT INTO schema_migrations (version) VALUES (?)', (version,))
self.conn.commit()
🔗 扩展阅读
📖 下一章:20 - 实战场景 —— 嵌入式应用、本地缓存、单用户应用、日志