OCaml 教程 / 数据库操作(Caqti)
数据库操作(Caqti)
Caqti 是 OCaml 的标准数据库抽象层,支持多种数据库后端,提供类型安全的查询接口。
Caqti 概述与连接池
opam install caqti caqti-driver-sqlite3 caqti-driver-postgresql
| 特性 | 说明 |
|---|---|
| 多后端 | SQLite、PostgreSQL、MariaDB |
| 类型安全 | 查询参数和结果类型检查 |
| 连接池 | 内置连接池管理 |
| 异步 | 支持 Lwt 和 Async |
| 事务 | 内置事务支持 |
open Caqti_request.Infix
open Caqti_type.Std
(* SQLite 连接 *)
let sqlite_uri = Uri.of_string "sqlite3:mydb.sqlite"
(* PostgreSQL 连接 *)
let pg_uri = Uri.of_string "postgresql://user:password@localhost:5432/mydb"
(* 创建连接池 *)
let create_pool uri =
match Caqti_lwt.connect_pool uri with
| Ok pool -> pool
| Error err -> failwith (Caqti_error.show err)
(* 使用连接池 *)
let pool = create_pool sqlite_uri
let run_query query =
Caqti_lwt.Pool.use (fun (module Db : Caqti_lwt.CONNECTION) ->
Db.find query
) pool
💡 提示:连接池默认大小为 10,可通过 ~max_size 参数调整。池管理器自动处理连接的创建、复用和回收。
查询与参数化
(* 查询类型 *)
type user = {
id: int;
name: string;
email: string;
age: int;
}
(* 定义类型 *)
let user_type =
let encode u = Ok (u.id, u.name, u.email, u.age) in
let decode (id, name, email, age) = Ok { id; name; email; age } in
Caqti_type.custom ~encode ~decode
Caqti_type.(tup4 int string string int)
(* 查询定义 *)
let find_user_by_id =
Caqti_type.(int ->! user_type)
"SELECT id, name, email FROM users WHERE id = ?"
let find_users_by_age =
Caqti_type.(int ->* user_type)
"SELECT id, name, email FROM users WHERE age >= ?"
let count_users =
Caqti_type.(unit ->! int)
"SELECT COUNT(*) FROM users"
(* 插入 *)
let insert_user =
Caqti_type.(tup3 string string int ->! int)
"INSERT INTO users (name, email, age) VALUES (?, ?, ?) RETURNING id"
(* 更新 *)
let update_user =
Caqti_type.(tup3 string int int ->. unit)
"UPDATE users SET name = ?, email = ? WHERE id = ?"
(* 删除 *)
let delete_user =
Caqti_type.(int ->. unit)
"DELETE FROM users WHERE id = ?"
(* 执行查询 *)
let get_user pool id =
Caqti_lwt.Pool.use (fun (module Db) ->
Db.find find_user_by_id id
) pool
let get_users_over_age pool age =
Caqti_lwt.Pool.use (fun (module Db) ->
Db.collect find_users_by_age age
) pool
let create_user pool name email age =
Caqti_lwt.Pool.use (fun (module Db) ->
Db.find insert_user (name, email, age)
) pool
| 查询函数 | 说明 | 返回类型 |
|---|---|---|
Db.find | 单行结果 | 'a |
Db.collect | 多行结果 | 'a list |
Db.exec | 无返回值 | unit |
Db.find_opt | 可选单行 | 'a option |
⚠️ 注意:Caqti 使用 ? 占位符而非 $1,参数顺序必须与 Caqti_type 定义一致。
事务管理
(* 事务封装 *)
let with_transaction pool f =
Caqti_lwt.Pool.use (fun (module Db) ->
let%lwt () = Db.start () in
match%lwt f (module Db : Caqti_lwt.CONNECTION) with
| result ->
let%lwt () = Db.commit () in
Lwt.return result
| exception exn ->
let%lwt () = Db.rollback () in
Lwt.fail exn
) pool
(* 转账示例 *)
let transfer pool from_id to_id amount =
with_transaction pool (fun (module Db) ->
(* 检查余额 *)
let%lwt balance = Db.find get_balance from_id in
if balance < amount then
Lwt.fail (Failure "余额不足")
else begin
(* 扣减 *)
let%lwt () = Db.exec debit_account (amount, from_id) in
(* 增加 *)
let%lwt () = Db.exec credit_account (amount, to_id) in
Lwt.return_ok ()
end
)
(* 嵌套事务(保存点) *)
let complex_operation pool =
with_transaction pool (fun (module Db) ->
let%lwt () = Db.exec insert_log "操作开始" in
(try%lwt
let%lwt () = Db.exec risky_operation () in
Lwt.return_unit
with exn ->
(* 记录错误但继续 *)
let%lwt () = Db.exec insert_log (Printexc.to_string exn) in
Lwt.return_unit);
let%lwt () = Db.exec insert_log "操作完成" in
Lwt.return_unit
)
💡 提示:始终使用事务来保证数据一致性。即使单条语句,也建议包装在事务中,避免部分成功导致数据不一致。
类型安全查询
(* 使用 Caqti_type 构建复杂类型 *)
open Caqti_type.Std
(* 复合类型 *)
type address = {
street: string;
city: string;
zip: string;
}
let address_type =
let encode a = Ok (a.street, a.city, a.zip) in
let decode (street, city, zip) = Ok { street; city; zip } in
custom ~encode ~decode (tup3 string string string)
(* 可选字段 *)
let find_user_with_address =
let open Caqti_type in
(int ->! tup2 (option string) string)
"SELECT address, city FROM users WHERE id = ?"
(* 枚举类型 *)
type status = Active | Inactive | Banned
let status_type =
let encode = function
| Active -> Ok "active"
| Inactive -> Ok "inactive"
| Banned -> Ok "banned"
in
let decode = function
| "active" -> Ok Active
| "inactive" -> Ok Inactive
| "banned" -> Ok Banned
| s -> Error (Printf.sprintf "未知状态: %s" s)
in
custom ~encode ~decode string
(* JSON 字段 *)
let json_type =
let encode j = Ok (Yojson.Safe.to_string j) in
let decode s = Ok (Yojson.Safe.from_string s) in
custom ~encode ~decode string
type user = {
id: int;
name: string;
email: string;
status: status;
metadata: Yojson.Safe.t;
}
let user_type =
let encode u = Ok (u.id, u.name, u.email, u.status, u.metadata) in
let decode (id, name, email, status, metadata) =
Ok { id; name; email; status; metadata }
in
custom ~encode ~decode
Caqti_type.(tup5 int string string status_type json_type)
连接驱动
SQLite
(* SQLite 特定功能 *)
let sqlite_pool =
Caqti_lwt.connect_pool (Uri.of_string "sqlite3:mydb.sqlite")
|> Result.get_ok
(* SQLite PRAGMA *)
let set_wal_mode =
Caqti_type.(unit ->. unit)
"PRAGMA journal_mode=WAL"
let set_busy_timeout =
Caqti_type.(int ->. unit)
"PRAGMA busy_timeout = ?"
(* 初始化 *)
let init_sqlite pool =
Caqti_lwt.Pool.use (fun (module Db) ->
let%lwt () = Db.exec set_wal_mode () in
let%lwt () = Db.exec set_busy_timeout 5000 in
let%lwt () = Db.exec create_tables () in
Lwt.return_unit
) pool
PostgreSQL
(* PostgreSQL 特定功能 *)
let pg_pool =
Caqti_lwt.connect_pool
(Uri.of_string "postgresql://user:pass@localhost:5432/mydb")
|> Result.get_ok
(* 使用 $1, $2 参数 *)
let find_user_pg =
Caqti_type.(int ->! user_type)
"SELECT id, name, email FROM users WHERE id = $1"
(* PostgreSQL 数组 *)
let find_users_by_ids =
Caqti_type.(array int ->* user_type)
"SELECT id, name, email FROM users WHERE id = ANY($1)"
(* PostgreSQL JSONB *)
let update_metadata =
Caqti_type.(tup2 json_type int ->. unit)
"UPDATE users SET metadata = metadata || $1 WHERE id = $2"
(* COPY 命令(批量导入) *)
let copy_users =
Caqti_type.(unit ->. unit)
"COPY users (name, email) FROM STDIN"
| 驱动 | URI 格式 | 特点 |
|---|---|---|
| SQLite | sqlite3:path | 文件数据库,零配置 |
| PostgreSQL | postgresql://user:pass@host:port/db | 功能丰富,生产首选 |
| MariaDB | mariadb://user:pass@host:port/db | MySQL 兼容 |
⚠️ 注意:SQLite 不支持真正的并发写入,多写入者场景应使用 PostgreSQL。
迁移管理
(* 简单的迁移系统 *)
type migration = {
version: int;
name: string;
up: string;
down: string;
}
let migrations = [
{
version = 1;
name = "create_users";
up = {|
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
|};
down = "DROP TABLE users";
};
{
version = 2;
name = "add_age_column";
up = "ALTER TABLE users ADD COLUMN age INTEGER DEFAULT 0";
down = "ALTER TABLE users DROP COLUMN age";
};
{
version = 3;
name = "create_posts";
up = {|
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
body TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)
|};
down = "DROP TABLE posts";
};
]
(* 迁移状态表 *)
let create_migrations_table =
Caqti_type.(unit ->. unit)
{|
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
|}
(* 获取当前版本 *)
let get_current_version =
Caqti_type.(unit ->! int)
"SELECT COALESCE(MAX(version), 0) FROM schema_migrations"
(* 记录迁移 *)
let record_migration =
Caqti_type.(tup2 int string ->. unit)
"INSERT INTO schema_migrations (version, name) VALUES (?, ?)"
(* 执行迁移 *)
let run_migrations pool =
Caqti_lwt.Pool.use (fun (module Db) ->
let%lwt () = Db.exec create_migrations_table () in
let%lwt current = Db.find get_current_version () in
let pending = List.filter (fun m -> m.version > current) migrations in
Lwt_list.iter_s (fun migration ->
Printf.printf "运行迁移 %d: %s\n" migration.version migration.name;
let%lwt () = Db.exec (Caqti_type.(unit ->. unit) migration.up) () in
let%lwt () = Db.exec record_migration (migration.version, migration.name) in
Lwt.return_unit
) pending
) pool
(* 回滚 *)
let rollback_migration pool version =
Caqti_lwt.Pool.use (fun (module Db) ->
let migration = List.find (fun m -> m.version = version) migrations in
let%lwt () = Db.exec (Caqti_type.(unit ->. unit) migration.down) () in
let%lwt () = Db.exec
(Caqti_type.(int ->. unit) "DELETE FROM schema_migrations WHERE version = ?")
version in
Lwt.return_unit
) pool
💡 提示:生产环境建议使用专业迁移工具如 sqitch 或 flyway,但简单项目可用此方案。
性能调优
(* 连接池配置 *)
let optimized_pool uri =
Caqti_lwt.connect_pool
~max_size:20 (* 最大连接数 *)
~max_idle_size:5 (* 最大空闲连接 *)
~idle_timeout:(`Seconds 300.0) (* 空闲超时 *)
uri
|> Result.get_ok
(* 批量操作 *)
let batch_insert pool users =
with_transaction pool (fun (module Db) ->
Lwt_list.map_s (fun user ->
Db.find insert_user (user.name, user.email, user.age)
) users
)
(* 预编译查询 *)
let prepared_queries = Hashtbl.create 16
let prepare_query pool name sql typ =
Caqti_lwt.Pool.use (fun (module Db) ->
(* Caqti 会缓存已准备的查询 *)
Lwt.return_unit
) pool
(* EXPLAIN 分析 *)
let explain_query pool sql =
Caqti_lwt.Pool.use (fun (module Db) ->
let query = Caqti_type.(unit ->! string)
(Printf.sprintf "EXPLAIN QUERY PLAN %s" sql) in
Db.find query ()
) pool
| 优化技术 | 说明 | 效果 |
|---|---|---|
| 连接池 | 复用数据库连接 | 减少连接开销 |
| 事务 | 合并多个操作 | 减少提交次数 |
| 批量插入 | 使用 INSERT 批量 | 10-100x 提速 |
| 索引 | 添加合适索引 | 查询提速 |
| 预编译 | 查询计划缓存 | 避免重复解析 |
ORM vs 手写 SQL
(* 手写 SQL(推荐) *)
let find_users_with_posts =
Caqti_type.(unit ->* tup2 user_type (list post_type))
{|
SELECT u.id, u.name, u.email, u.age,
p.id, p.title, p.body
FROM users u
LEFT JOIN posts p ON p.user_id = u.id
ORDER BY u.id
|}
(* ORM 风格(简化版) *)
module User = struct
type t = { id: int; name: string; email: string; age: int }
let find pool id =
let query = Caqti_type.(int ->! user_type)
"SELECT id, name, email, age FROM users WHERE id = ?" in
Caqti_lwt.Pool.use (fun (module Db) -> Db.find query id) pool
let find_all pool =
let query = Caqti_type.(unit ->* user_type)
"SELECT id, name, email, age FROM users" in
Caqti_lwt.Pool.use (fun (module Db) -> Db.collect query ()) pool
let create pool user =
let query = Caqti_type.(tup3 string string int ->! int)
"INSERT INTO users (name, email, age) VALUES (?, ?, ?) RETURNING id" in
Caqti_lwt.Pool.use (fun (module Db) -> Db.find query (user.name, user.email, user.age)) pool
let update pool user =
let query = Caqti_type.(tup3 string int int ->. unit)
"UPDATE users SET name = ?, email = ? WHERE id = ?" in
Caqti_lwt.Pool.use (fun (module Db) -> Db.exec query (user.name, user.email, user.id)) pool
let delete pool id =
let query = Caqti_type.(int ->. unit) "DELETE FROM users WHERE id = ?" in
Caqti_lwt.Pool.use (fun (module Db) -> Db.exec query id) pool
end
⚠️ 注意:ORM 简化了代码但可能隐藏性能问题。复杂查询建议手写 SQL。
数据库设计实战
(* 博客系统数据库设计 *)
(* 用户表 *)
let create_users = {|
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
bio TEXT,
avatar_url TEXT,
role TEXT DEFAULT 'user' CHECK(role IN ('admin', 'editor', 'user')),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
|}
(* 文章表 *)
let create_posts = {|
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
author_id INTEGER NOT NULL,
title TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL,
body TEXT NOT NULL,
excerpt TEXT,
status TEXT DEFAULT 'draft' CHECK(status IN ('draft', 'published', 'archived')),
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users(id) ON DELETE CASCADE
)
|}
(* 标签表 *)
let create_tags = {|
CREATE TABLE tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
slug TEXT UNIQUE NOT NULL
)
|}
(* 文章标签关联 *)
let create_post_tags = {|
CREATE TABLE post_tags (
post_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (post_id, tag_id),
FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
|}
(* 索引 *)
let create_indices = {|
CREATE INDEX idx_posts_author ON posts(author_id);
CREATE INDEX idx_posts_status ON posts(status);
CREATE INDEX idx_posts_published ON posts(published_at);
CREATE INDEX idx_posts_slug ON posts(slug);
CREATE INDEX idx_tags_slug ON tags(slug);
|}
(* 复杂查询示例:带标签的文章列表 *)
let find_published_posts =
let open Caqti_type in
(tup2 int int ->*
tup5 int string string string (list string))
{|
SELECT p.id, p.title, p.slug, p.excerpt,
GROUP_CONCAT(t.name) as tags
FROM posts p
LEFT JOIN post_tags pt ON pt.post_id = p.id
LEFT JOIN tags t ON t.id = pt.tag_id
WHERE p.status = 'published'
GROUP BY p.id
ORDER BY p.published_at DESC
LIMIT ? OFFSET ?
|}