强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OCaml 教程 / 数据库操作(Caqti)

数据库操作(Caqti)

Caqti 是 OCaml 的标准数据库抽象层,支持多种数据库后端,提供类型安全的查询接口。

Caqti 概述与连接池

opam install caqti caqti-driver-sqlite3 caqti-driver-postgresql
特性说明
多后端SQLite、PostgreSQL、MariaDB
类型安全查询参数和结果类型检查
连接池内置连接池管理
异步支持 Lwt 和 Async
事务内置事务支持
open Caqti_request.Infix
open Caqti_type.Std

(* SQLite 连接 *)
let sqlite_uri = Uri.of_string "sqlite3:mydb.sqlite"

(* PostgreSQL 连接 *)
let pg_uri = Uri.of_string "postgresql://user:password@localhost:5432/mydb"

(* 创建连接池 *)
let create_pool uri =
  match Caqti_lwt.connect_pool uri with
  | Ok pool -> pool
  | Error err -> failwith (Caqti_error.show err)

(* 使用连接池 *)
let pool = create_pool sqlite_uri

let run_query query =
  Caqti_lwt.Pool.use (fun (module Db : Caqti_lwt.CONNECTION) ->
    Db.find query
  ) pool

💡 提示:连接池默认大小为 10,可通过 ~max_size 参数调整。池管理器自动处理连接的创建、复用和回收。

查询与参数化

(* 查询类型 *)
type user = {
  id: int;
  name: string;
  email: string;
  age: int;
}

(* 定义类型 *)
let user_type =
  let encode u = Ok (u.id, u.name, u.email, u.age) in
  let decode (id, name, email, age) = Ok { id; name; email; age } in
  Caqti_type.custom ~encode ~decode
    Caqti_type.(tup4 int string string int)

(* 查询定义 *)
let find_user_by_id =
  Caqti_type.(int ->! user_type)
    "SELECT id, name, email FROM users WHERE id = ?"

let find_users_by_age =
  Caqti_type.(int ->* user_type)
    "SELECT id, name, email FROM users WHERE age >= ?"

let count_users =
  Caqti_type.(unit ->! int)
    "SELECT COUNT(*) FROM users"

(* 插入 *)
let insert_user =
  Caqti_type.(tup3 string string int ->! int)
    "INSERT INTO users (name, email, age) VALUES (?, ?, ?) RETURNING id"

(* 更新 *)
let update_user =
  Caqti_type.(tup3 string int int ->. unit)
    "UPDATE users SET name = ?, email = ? WHERE id = ?"

(* 删除 *)
let delete_user =
  Caqti_type.(int ->. unit)
    "DELETE FROM users WHERE id = ?"

(* 执行查询 *)
let get_user pool id =
  Caqti_lwt.Pool.use (fun (module Db) ->
    Db.find find_user_by_id id
  ) pool

let get_users_over_age pool age =
  Caqti_lwt.Pool.use (fun (module Db) ->
    Db.collect find_users_by_age age
  ) pool

let create_user pool name email age =
  Caqti_lwt.Pool.use (fun (module Db) ->
    Db.find insert_user (name, email, age)
  ) pool
查询函数说明返回类型
Db.find单行结果'a
Db.collect多行结果'a list
Db.exec无返回值unit
Db.find_opt可选单行'a option

⚠️ 注意:Caqti 使用 ? 占位符而非 $1,参数顺序必须与 Caqti_type 定义一致。

事务管理

(* 事务封装 *)
let with_transaction pool f =
  Caqti_lwt.Pool.use (fun (module Db) ->
    let%lwt () = Db.start () in
    match%lwt f (module Db : Caqti_lwt.CONNECTION) with
    | result ->
      let%lwt () = Db.commit () in
      Lwt.return result
    | exception exn ->
      let%lwt () = Db.rollback () in
      Lwt.fail exn
  ) pool

(* 转账示例 *)
let transfer pool from_id to_id amount =
  with_transaction pool (fun (module Db) ->
    (* 检查余额 *)
    let%lwt balance = Db.find get_balance from_id in
    if balance < amount then
      Lwt.fail (Failure "余额不足")
    else begin
      (* 扣减 *)
      let%lwt () = Db.exec debit_account (amount, from_id) in
      (* 增加 *)
      let%lwt () = Db.exec credit_account (amount, to_id) in
      Lwt.return_ok ()
    end
  )

(* 嵌套事务(保存点) *)
let complex_operation pool =
  with_transaction pool (fun (module Db) ->
    let%lwt () = Db.exec insert_log "操作开始" in
    (try%lwt
      let%lwt () = Db.exec risky_operation () in
      Lwt.return_unit
    with exn ->
      (* 记录错误但继续 *)
      let%lwt () = Db.exec insert_log (Printexc.to_string exn) in
      Lwt.return_unit);
    let%lwt () = Db.exec insert_log "操作完成" in
    Lwt.return_unit
  )

💡 提示:始终使用事务来保证数据一致性。即使单条语句,也建议包装在事务中,避免部分成功导致数据不一致。

类型安全查询

(* 使用 Caqti_type 构建复杂类型 *)
open Caqti_type.Std

(* 复合类型 *)
type address = {
  street: string;
  city: string;
  zip: string;
}

let address_type =
  let encode a = Ok (a.street, a.city, a.zip) in
  let decode (street, city, zip) = Ok { street; city; zip } in
  custom ~encode ~decode (tup3 string string string)

(* 可选字段 *)
let find_user_with_address =
  let open Caqti_type in
  (int ->! tup2 (option string) string)
    "SELECT address, city FROM users WHERE id = ?"

(* 枚举类型 *)
type status = Active | Inactive | Banned

let status_type =
  let encode = function
    | Active -> Ok "active"
    | Inactive -> Ok "inactive"
    | Banned -> Ok "banned"
  in
  let decode = function
    | "active" -> Ok Active
    | "inactive" -> Ok Inactive
    | "banned" -> Ok Banned
    | s -> Error (Printf.sprintf "未知状态: %s" s)
  in
  custom ~encode ~decode string

(* JSON 字段 *)
let json_type =
  let encode j = Ok (Yojson.Safe.to_string j) in
  let decode s = Ok (Yojson.Safe.from_string s) in
  custom ~encode ~decode string

type user = {
  id: int;
  name: string;
  email: string;
  status: status;
  metadata: Yojson.Safe.t;
}

let user_type =
  let encode u = Ok (u.id, u.name, u.email, u.status, u.metadata) in
  let decode (id, name, email, status, metadata) =
    Ok { id; name; email; status; metadata }
  in
  custom ~encode ~decode
    Caqti_type.(tup5 int string string status_type json_type)

连接驱动

SQLite

(* SQLite 特定功能 *)
let sqlite_pool =
  Caqti_lwt.connect_pool (Uri.of_string "sqlite3:mydb.sqlite")
  |> Result.get_ok

(* SQLite PRAGMA *)
let set_wal_mode =
  Caqti_type.(unit ->. unit)
    "PRAGMA journal_mode=WAL"

let set_busy_timeout =
  Caqti_type.(int ->. unit)
    "PRAGMA busy_timeout = ?"

(* 初始化 *)
let init_sqlite pool =
  Caqti_lwt.Pool.use (fun (module Db) ->
    let%lwt () = Db.exec set_wal_mode () in
    let%lwt () = Db.exec set_busy_timeout 5000 in
    let%lwt () = Db.exec create_tables () in
    Lwt.return_unit
  ) pool

PostgreSQL

(* PostgreSQL 特定功能 *)
let pg_pool =
  Caqti_lwt.connect_pool
    (Uri.of_string "postgresql://user:pass@localhost:5432/mydb")
  |> Result.get_ok

(* 使用 $1, $2 参数 *)
let find_user_pg =
  Caqti_type.(int ->! user_type)
    "SELECT id, name, email FROM users WHERE id = $1"

(* PostgreSQL 数组 *)
let find_users_by_ids =
  Caqti_type.(array int ->* user_type)
    "SELECT id, name, email FROM users WHERE id = ANY($1)"

(* PostgreSQL JSONB *)
let update_metadata =
  Caqti_type.(tup2 json_type int ->. unit)
    "UPDATE users SET metadata = metadata || $1 WHERE id = $2"

(* COPY 命令(批量导入) *)
let copy_users =
  Caqti_type.(unit ->. unit)
    "COPY users (name, email) FROM STDIN"
驱动URI 格式特点
SQLitesqlite3:path文件数据库,零配置
PostgreSQLpostgresql://user:pass@host:port/db功能丰富,生产首选
MariaDBmariadb://user:pass@host:port/dbMySQL 兼容

⚠️ 注意:SQLite 不支持真正的并发写入,多写入者场景应使用 PostgreSQL。

迁移管理

(* 简单的迁移系统 *)
type migration = {
  version: int;
  name: string;
  up: string;
  down: string;
}

let migrations = [
  {
    version = 1;
    name = "create_users";
    up = {|
      CREATE TABLE users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      )
    |};
    down = "DROP TABLE users";
  };
  {
    version = 2;
    name = "add_age_column";
    up = "ALTER TABLE users ADD COLUMN age INTEGER DEFAULT 0";
    down = "ALTER TABLE users DROP COLUMN age";
  };
  {
    version = 3;
    name = "create_posts";
    up = {|
      CREATE TABLE posts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id INTEGER NOT NULL,
        title TEXT NOT NULL,
        body TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (user_id) REFERENCES users(id)
      )
    |};
    down = "DROP TABLE posts";
  };
]

(* 迁移状态表 *)
let create_migrations_table =
  Caqti_type.(unit ->. unit)
    {|
      CREATE TABLE IF NOT EXISTS schema_migrations (
        version INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      )
    |}

(* 获取当前版本 *)
let get_current_version =
  Caqti_type.(unit ->! int)
    "SELECT COALESCE(MAX(version), 0) FROM schema_migrations"

(* 记录迁移 *)
let record_migration =
  Caqti_type.(tup2 int string ->. unit)
    "INSERT INTO schema_migrations (version, name) VALUES (?, ?)"

(* 执行迁移 *)
let run_migrations pool =
  Caqti_lwt.Pool.use (fun (module Db) ->
    let%lwt () = Db.exec create_migrations_table () in
    let%lwt current = Db.find get_current_version () in
    let pending = List.filter (fun m -> m.version > current) migrations in
    Lwt_list.iter_s (fun migration ->
      Printf.printf "运行迁移 %d: %s\n" migration.version migration.name;
      let%lwt () = Db.exec (Caqti_type.(unit ->. unit) migration.up) () in
      let%lwt () = Db.exec record_migration (migration.version, migration.name) in
      Lwt.return_unit
    ) pending
  ) pool

(* 回滚 *)
let rollback_migration pool version =
  Caqti_lwt.Pool.use (fun (module Db) ->
    let migration = List.find (fun m -> m.version = version) migrations in
    let%lwt () = Db.exec (Caqti_type.(unit ->. unit) migration.down) () in
    let%lwt () = Db.exec
      (Caqti_type.(int ->. unit) "DELETE FROM schema_migrations WHERE version = ?")
      version in
    Lwt.return_unit
  ) pool

💡 提示:生产环境建议使用专业迁移工具如 sqitchflyway,但简单项目可用此方案。

性能调优

(* 连接池配置 *)
let optimized_pool uri =
  Caqti_lwt.connect_pool
    ~max_size:20        (* 最大连接数 *)
    ~max_idle_size:5    (* 最大空闲连接 *)
    ~idle_timeout:(`Seconds 300.0)  (* 空闲超时 *)
    uri
  |> Result.get_ok

(* 批量操作 *)
let batch_insert pool users =
  with_transaction pool (fun (module Db) ->
    Lwt_list.map_s (fun user ->
      Db.find insert_user (user.name, user.email, user.age)
    ) users
  )

(* 预编译查询 *)
let prepared_queries = Hashtbl.create 16

let prepare_query pool name sql typ =
  Caqti_lwt.Pool.use (fun (module Db) ->
    (* Caqti 会缓存已准备的查询 *)
    Lwt.return_unit
  ) pool

(* EXPLAIN 分析 *)
let explain_query pool sql =
  Caqti_lwt.Pool.use (fun (module Db) ->
    let query = Caqti_type.(unit ->! string)
      (Printf.sprintf "EXPLAIN QUERY PLAN %s" sql) in
    Db.find query ()
  ) pool
优化技术说明效果
连接池复用数据库连接减少连接开销
事务合并多个操作减少提交次数
批量插入使用 INSERT 批量10-100x 提速
索引添加合适索引查询提速
预编译查询计划缓存避免重复解析

ORM vs 手写 SQL

(* 手写 SQL(推荐) *)
let find_users_with_posts =
  Caqti_type.(unit ->* tup2 user_type (list post_type))
    {|
      SELECT u.id, u.name, u.email, u.age,
             p.id, p.title, p.body
      FROM users u
      LEFT JOIN posts p ON p.user_id = u.id
      ORDER BY u.id
    |}

(* ORM 风格(简化版) *)
module User = struct
  type t = { id: int; name: string; email: string; age: int }

  let find pool id =
    let query = Caqti_type.(int ->! user_type)
      "SELECT id, name, email, age FROM users WHERE id = ?" in
    Caqti_lwt.Pool.use (fun (module Db) -> Db.find query id) pool

  let find_all pool =
    let query = Caqti_type.(unit ->* user_type)
      "SELECT id, name, email, age FROM users" in
    Caqti_lwt.Pool.use (fun (module Db) -> Db.collect query ()) pool

  let create pool user =
    let query = Caqti_type.(tup3 string string int ->! int)
      "INSERT INTO users (name, email, age) VALUES (?, ?, ?) RETURNING id" in
    Caqti_lwt.Pool.use (fun (module Db) -> Db.find query (user.name, user.email, user.age)) pool

  let update pool user =
    let query = Caqti_type.(tup3 string int int ->. unit)
      "UPDATE users SET name = ?, email = ? WHERE id = ?" in
    Caqti_lwt.Pool.use (fun (module Db) -> Db.exec query (user.name, user.email, user.id)) pool

  let delete pool id =
    let query = Caqti_type.(int ->. unit) "DELETE FROM users WHERE id = ?" in
    Caqti_lwt.Pool.use (fun (module Db) -> Db.exec query id) pool
end

⚠️ 注意:ORM 简化了代码但可能隐藏性能问题。复杂查询建议手写 SQL。

数据库设计实战

(* 博客系统数据库设计 *)

(* 用户表 *)
let create_users = {|
  CREATE TABLE users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    username TEXT UNIQUE NOT NULL,
    email TEXT UNIQUE NOT NULL,
    password_hash TEXT NOT NULL,
    bio TEXT,
    avatar_url TEXT,
    role TEXT DEFAULT 'user' CHECK(role IN ('admin', 'editor', 'user')),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  )
|}

(* 文章表 *)
let create_posts = {|
  CREATE TABLE posts (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    author_id INTEGER NOT NULL,
    title TEXT NOT NULL,
    slug TEXT UNIQUE NOT NULL,
    body TEXT NOT NULL,
    excerpt TEXT,
    status TEXT DEFAULT 'draft' CHECK(status IN ('draft', 'published', 'archived')),
    published_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (author_id) REFERENCES users(id) ON DELETE CASCADE
  )
|}

(* 标签表 *)
let create_tags = {|
  CREATE TABLE tags (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT UNIQUE NOT NULL,
    slug TEXT UNIQUE NOT NULL
  )
|}

(* 文章标签关联 *)
let create_post_tags = {|
  CREATE TABLE post_tags (
    post_id INTEGER NOT NULL,
    tag_id INTEGER NOT NULL,
    PRIMARY KEY (post_id, tag_id),
    FOREIGN KEY (post_id) REFERENCES posts(id) ON DELETE CASCADE,
    FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
  )
|}

(* 索引 *)
let create_indices = {|
  CREATE INDEX idx_posts_author ON posts(author_id);
  CREATE INDEX idx_posts_status ON posts(status);
  CREATE INDEX idx_posts_published ON posts(published_at);
  CREATE INDEX idx_posts_slug ON posts(slug);
  CREATE INDEX idx_tags_slug ON tags(slug);
|}

(* 复杂查询示例:带标签的文章列表 *)
let find_published_posts =
  let open Caqti_type in
  (tup2 int int ->*
   tup5 int string string string (list string))
    {|
      SELECT p.id, p.title, p.slug, p.excerpt,
             GROUP_CONCAT(t.name) as tags
      FROM posts p
      LEFT JOIN post_tags pt ON pt.post_id = p.id
      LEFT JOIN tags t ON t.id = pt.tag_id
      WHERE p.status = 'published'
      GROUP BY p.id
      ORDER BY p.published_at DESC
      LIMIT ? OFFSET ?
    |}

扩展阅读


上一节数据序列化(JSON/Protobuf) 下一节测试框架 Alcotest/OUnit