17 - 数据库
第 17 章:数据库
掌握 SQLite、SQLAlchemy ORM、数据库迁移和异步数据库操作。
17.1 SQLite(标准库)
17.1.1 基本操作
import sqlite3
# 连接数据库(文件不存在会自动创建)
conn = sqlite3.connect("app.db")
conn.row_factory = sqlite3.Row # 返回字典式行对象
# 创建表
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
# 插入
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "[email protected]"))
conn.commit()
# 查询
rows = conn.execute("SELECT * FROM users").fetchall()
for row in rows:
print(dict(row))
# 参数化查询(防 SQL 注入)
user = conn.execute("SELECT * FROM users WHERE email = ?", ("[email protected]",)).fetchone()
print(user["name"])
# 关闭
conn.close()
17.1.2 上下文管理器
import sqlite3
with sqlite3.connect("app.db") as conn:
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "[email protected]"))
# 自动提交(如果无异常)或回滚(如果有异常)
17.2 SQLAlchemy ORM
17.2.1 安装与配置
$ pip install sqlalchemy
17.2.2 声明式模型
from datetime import datetime
from sqlalchemy import create_engine, String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)
created_at: Mapped[datetime] = mapped_column(default=func.now())
# 关系
posts: Mapped[list["Post"]] = relationship(back_populates="author")
def __repr__(self) -> str:
return f"User(id={self.id}, name={self.name!r})"
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
17.2.3 CRUD 操作
# 创建引擎
engine = create_engine("sqlite:///app.db", echo=True)
# 创建表
Base.metadata.create_all(engine)
# 使用 Session
with Session(engine) as session:
# Create
user = User(name="Alice", email="[email protected]")
session.add(user)
session.commit()
# Read
user = session.query(User).filter_by(name="Alice").first()
users = session.query(User).filter(User.name.like("A%")).all()
# Update
user.name = "Alice Updated"
session.commit()
# Delete
session.delete(user)
session.commit()
17.2.4 查询构建
from sqlalchemy import select, and_, or_
with Session(engine) as session:
# 基本查询
stmt = select(User).where(User.name == "Alice")
users = session.scalars(stmt).all()
# 条件组合
stmt = select(User).where(
and_(
User.name.like("A%"),
or_(User.email.like("%@gmail.com"), User.email.like("%@outlook.com")),
)
)
# 排序和分页
stmt = select(User).order_by(User.created_at.desc()).offset(10).limit(10)
# 聚合
stmt = select(func.count(User.id))
count = session.scalar(stmt)
# 关联查询
stmt = select(User, Post).join(Post, User.id == Post.user_id)
results = session.execute(stmt).all()
17.3 连接池
from sqlalchemy import create_engine
# 配置连接池
engine = create_engine(
"postgresql://user:password@localhost:5432/mydb",
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接
pool_timeout=30, # 获取连接超时
pool_recycle=3600, # 连接回收时间
pool_pre_ping=True, # 使用前检查连接
)
17.4 数据库迁移(Alembic)
# 安装
$ pip install alembic
# 初始化
$ alembic init alembic
# 创建迁移
$ alembic revision --autogenerate -m "add users table"
# 执行迁移
$ alembic upgrade head
# 回滚
$ alembic downgrade -1
# 查看历史
$ alembic history
alembic.ini 关键配置:
[alembic]
script_location = alembic
sqlalchemy.url = sqlite:///app.db
17.5 异步 ORM(SQLAlchemy 2.0+)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
import asyncio
async_engine = create_async_engine("sqlite+aiosqlite:///app.db")
async_session = async_sessionmaker(async_engine, class_=AsyncSession)
async def main():
async with async_session() as session:
user = User(name="Alice", email="[email protected]")
session.add(user)
await session.commit()
# 查询
result = await session.execute(select(User).where(User.name == "Alice"))
user = result.scalar_one()
print(user)
asyncio.run(main())
17.6 数据库选型
| 数据库 | 类型 | 适用场景 | Python 驱动 |
|---|---|---|---|
| SQLite | 嵌入式 | 本地开发、小型应用 | sqlite3 (标准库) |
| PostgreSQL | 关系型 | 生产环境首选 | psycopg2, asyncpg |
| MySQL | 关系型 | Web 应用 | pymysql, aiomysql |
| MongoDB | 文档型 | 灵活 schema | pymongo, motor |
| Redis | 键值型 | 缓存、会话 | redis |
17.7 注意事项
🔴 注意:
- 永远使用参数化查询,防止 SQL 注入
- 使用连接池管理数据库连接
- 生产环境使用 PostgreSQL 而非 SQLite
- 数据库迁移脚本要纳入版本控制
💡 提示:
- 使用 SQLAlchemy 2.0+ 的新式声明模型
- 使用 Alembic 管理数据库迁移
- 使用
async_session支持异步操作 echo=True仅在开发时启用,生产环境关闭
📌 业务场景:
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
class UserRepository:
def __init__(self, engine):
self.engine = engine
def get_by_id(self, user_id: int) -> User | None:
with Session(self.engine) as session:
return session.get(User, user_id)
def get_by_email(self, email: str) -> User | None:
with Session(self.engine) as session:
return session.scalars(
select(User).where(User.email == email)
).first()
def create(self, name: str, email: str) -> User:
with Session(self.engine) as session:
user = User(name=name, email=email)
session.add(user)
session.commit()
session.refresh(user)
return user