强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OCaml 教程 / 完整项目:构建一个小型数据库

完整项目:构建一个小型数据库

本节将从零构建一个小型关系型数据库,涵盖存储引擎、查询解析、优化器、事务处理等核心组件。

项目架构设计

minidb/
├── lib/
│   ├── storage/          # 存储引擎
│   │   ├── page.ml       # 页管理
│   │   ├── btree.ml      # B-Tree 索引
│   │   ├── buffer.ml     # 缓冲池
│   │   └── wal.ml        # 预写日志
│   ├── query/            # 查询处理
│   │   ├── lexer.mll     # 词法分析
│   │   ├── parser.mly    # 语法分析
│   │   ├── ast.ml        # 抽象语法树
│   │   ├── optimizer.ml  # 查询优化器
│   │   └── executor.ml   # 查询执行器
│   ├── transaction/      # 事务处理
│   │   ├── mvcc.ml       # 多版本并发控制
│   │   └── lock.ml       # 锁管理
│   └── common/           # 公共模块
│       ├── types.ml      # 类型定义
│       ├── error.ml      # 错误处理
│       └── config.ml     # 配置
├── bin/                  # 命令行接口
│   └── main.ml
├── test/                 # 测试
│   ├── test_storage.ml
│   ├── test_query.ml
│   └── test_transaction.ml
├── dune-project
└── minidb.opam
层级组件职责
存储层Page/B-Tree/Buffer数据持久化与索引
查询层Parser/Optimizer/ExecutorSQL 处理
事务层MVCC/Lock并发控制
接口层CLI/REPL用户交互

存储引擎

页管理

(* storage/page.ml *)
let page_size = 4096  (* 4KB 页大小 *)

type page = {
  id: int;
  data: Bytes.t;
  mutable dirty: bool;
}

let create id = {
  id;
  data = Bytes.create page_size;
  dirty = false;
}

(* 页内偏移读写 *)
let read_int page offset =
  Bytes.get_int32_le page.data offset |> Int32.to_int

let write_int page offset value =
  Bytes.set_int32_le page.data offset (Int32.of_int value);
  page.dirty <- true

let read_bytes page offset length =
  Bytes.sub page.data offset length

let write_bytes page offset data =
  Bytes.blit data 0 page.data offset (Bytes.length data);
  page.dirty <- true

let read_string page offset length =
  Bytes.sub_string page.data offset length

let write_string page offset s =
  Bytes.blit_string s 0 page.data offset (String.length s);
  page.dirty <- true

(* 页头结构 *)
type page_header = {
  page_type: int;      (* 0=数据页, 1=索引页, 2=溢出页 *)
  num_cells: int;      (* 单元格数量 *)
  free_offset: int;    (* 空闲区域起始 *)
  cell_offset: int;    (* 单元格区域起始 *)
}

let read_header page =
  {
    page_type = read_int page 0;
    num_cells = read_int page 4;
    free_offset = read_int page 8;
    cell_offset = read_int page 12;
  }

let write_header page header =
  write_int page 0 header.page_type;
  write_int page 4 header.num_cells;
  write_int page 8 header.free_offset;
  write_int page 12 header.cell_offset

💡 提示:固定页大小(4KB)是数据库设计的常见选择,与操作系统页大小对齐,减少碎片。

B-Tree 索引

(* storage/btree.ml *)
open Page

type btree_node =
  | Leaf of {
      keys: string list;
      values: bytes list;
      next: int;  (* 叶子链表 *)
    }
  | Internal of {
      keys: string list;
      children: int list;
    }

type btree = {
  fd: Unix.file_descr;
  mutable root_page: int;
  mutable num_pages: int;
}

let create filename =
  let fd = Unix.openfile filename
    [Unix.O_RDWR; Unix.O_CREAT; Unix.O_TRUNC] 0o644 in
  let tree = { fd; root_page = 0; num_pages = 1 } in
  (* 初始化根页 *)
  let root = create 0 in
  let header = { page_type = 1; num_cells = 0; free_offset = page_size; cell_offset = 16 } in
  write_header root header;
  (* 写入到文件 *)
  Unix.lseek fd 0 Unix.SEEK_SET |> ignore;
  Unix.write fd root.data 0 page_size |> ignore;
  tree

(* 二分搜索 *)
let binary_search keys target =
  let rec aux lo hi =
    if lo >= hi then lo
    else
      let mid = lo + (hi - lo) / 2 in
      let cmp = String.compare keys.(mid) target in
      if cmp < 0 then aux (mid + 1) hi
      else if cmp > 0 then aux lo mid
      else mid
  in
  aux 0 (Array.length keys)

(* 插入 *)
let rec insert tree key value =
  let root = read_page tree.fd tree.root_page in
  let header = read_header root in
  match header.page_type with
  | 1 -> (* 根是叶子 *)
    insert_into_leaf tree root key value
  | 2 -> (* 根是内部节点 *)
    let new_root = split_if_needed tree root key value in
    if new_root <> tree.root_page then
      tree.root_page <- new_root
  | _ -> failwith "未知页类型"

and insert_into_leaf tree page key value =
  let header = read_header page in
  if header.num_cells < max_leaf_cells then begin
    (* 直接插入 *)
    let idx = find_insert_position page key in
    shift_cells_right page idx;
    write_cell page idx key value;
    write_header page { header with num_cells = header.num_cells + 1 }
  end else begin
    (* 分裂 *)
    split_leaf tree page key value
  end

and split_leaf tree page key value =
  let mid = max_leaf_cells / 2 in
  let new_page_id = allocate_page tree in
  let new_page = create new_page_id in
  (* 将一半数据移动到新页 *)
  (* ... *)
  (* 更新父节点 *)
  update_parent tree page.id new_page_id

(* 查找 *)
let find tree key =
  let page = read_page tree.fd tree.root_page in
  let header = read_header page in
  match header.page_type with
  | 1 -> find_in_leaf page key
  | 2 -> find_in_internal tree page key
  | _ -> None

and find_in_leaf page key =
  let header = read_header page in
  let keys = read_keys page header.num_cells in
  let idx = binary_search (Array.of_list keys) key in
  if idx < Array.length keys && keys.(idx) = key then
    Some (read_value page idx)
  else None

and find_in_internal tree page key =
  let header = read_header page in
  let keys = read_keys page header.num_cells in
  let children = read_children page (header.num_cells + 1) in
  let idx = binary_search (Array.of_list keys) key in
  let child_page_id = List.nth children idx in
  let child_page = read_page tree.fd child_page_id in
  find tree child_page key
B-Tree 操作时间复杂度说明
查找O(log n)二分搜索
插入O(log n)可能触发分裂
删除O(log n)可能触发合并
范围扫描O(log n + k)叶子链表

⚠️ 注意:B-Tree 分裂和合并是复杂的操作,需要仔细处理各种边界情况(空树、单节点、满节点等)。

缓冲池

(* storage/buffer.ml *)
type buffer_pool = {
  pool_size: int;
  pages: (int, page) Hashtbl.t;
  mutable lru: int list;  (* 最近使用的页 ID *)
  fd: Unix.file_descr;
}

let create_pool pool_size fd = {
  pool_size;
  pages = Hashtbl.create pool_size;
  lru = [];
  fd;
}

(* 获取页(带 LRU 淘汰) *)
let get_page pool page_id =
  match Hashtbl.find_opt pool.pages page_id with
  | Some page ->
    (* 更新 LRU *)
    pool.lru <- page_id :: List.filter ((<>) page_id) pool.lru;
    page
  | None ->
    (* 需要从磁盘读取 *)
    if Hashtbl.length pool.pages >= pool.pool_size then begin
      (* 淘汰最久未使用的页 *)
      let victim_id = List.last pool.lru in
      let victim = Hashtbl.find pool.pages victim_id in
      if victim.dirty then
        flush_page pool.fd victim;
      Hashtbl.remove pool.pages victim_id;
      pool.lru <- List.filter ((<>) victim_id) pool.lru
    end;
    (* 读取新页 *)
    let page = read_from_disk pool.fd page_id in
    Hashtbl.add pool.pages page_id page;
    pool.lru <- page_id :: pool.lru;
    page

(* 刷新脏页 *)
let flush_all pool =
  Hashtbl.iter (fun _ page ->
    if page.dirty then begin
      flush_page pool.fd page;
      page.dirty <- false
    end
  ) pool.pages

(* 预取 *)
let prefetch pool page_ids =
  List.iter (fun id ->
    if not (Hashtbl.mem pool.pages id) then
      ignore (get_page pool id)
  ) page_ids

💡 提示:缓冲池大小通常设为可用内存的 50-80%。太小会导致频繁磁盘 IO,太大会影响操作系统缓存。

预写日志(WAL)

(* storage/wal.ml *)
type wal_entry = {
  lsn: int;          (* 日志序列号 *)
  txn_id: int;       (* 事务 ID *)
  page_id: int;      (* 页 ID *)
  offset: int;       (* 页内偏移 *)
  before: bytes;     (* 修改前数据 *)
  after: bytes;      (* 修改后数据 *)
}

type wal = {
  fd: Unix.file_descr;
  mutable next_lsn: int;
  mutable buffer: bytes list;
  buffer_size: int;
}

let create_wal filename = {
  fd = Unix.openfile filename
    [Unix.O_RDWR; Unix.O_CREAT; Unix.O_APPEND] 0o644;
  next_lsn = 0;
  buffer = [];
  buffer_size = 1024;
}

(* 写入日志 *)
let write_log wal entry =
  let entry = { entry with lsn = wal.next_lsn } in
  wal.next_lsn <- wal.next_lsn + 1;
  wal.buffer <- entry :: wal.buffer;
  if List.length wal.buffer >= wal.buffer_size then
    flush_wal wal

(* 刷新到磁盘 *)
let flush_wal wal =
  let entries = List.rev wal.buffer in
  wal.buffer <- [];
  List.iter (fun entry ->
    let data = serialize_entry entry in
    Unix.write fd (Bytes.of_string data) 0 (String.length data) |> ignore
  ) entries;
  Unix.fsync wal.fd  (* 确保持久化 *)

(* 恢复 *)
let recover wal =
  let entries = read_all_entries wal.fd in
  (* 重放未提交的事务 *)
  List.iter (fun entry ->
    (* 应用修改 *)
    apply_entry entry
  ) entries

(* 检查点 *)
let checkpoint wal buffer_pool =
  (* 1. 刷新所有脏页 *)
  Buffer.flush_all buffer_pool;
  (* 2. 刷新 WAL *)
  flush_wal wal;
  (* 3. 截断已检查点的日志 *)
  Unix.ftruncate wal.fd 0;
  wal.next_lsn <- 0
WAL 操作说明何时执行
write_log记录修改每次写操作
flush_wal持久化日志事务提交时
checkpoint截断日志定期/日志过大
recover恢复数据数据库启动时

查询解析器

词法分析

(* query/lexer.mll *)
{
  open Parser
}

let digit = ['0'-'9']
let alpha = ['a'-'z' 'A'-'Z']
let ident = alpha (alpha | digit | '_')*
let whitespace = [' ' '\t' '\n' '\r']
let string_literal = '\'' [^ '\'']* '\''

rule token = parse
  | whitespace+ { token lexbuf }
  | "--" [^ '\n']* { token lexbuf }
  | digit+ as num { NUM (float_of_string num) }
  | string_literal as s { STRING (String.sub s 1 (String.length s - 2)) }
  | ident as id {
    match String.uppercase_ascii id with
    | "SELECT" -> SELECT
    | "FROM" -> FROM
    | "WHERE" -> WHERE
    | "AND" -> AND
    | "OR" -> OR
    | "NOT" -> NOT
    | "INSERT" -> INSERT
    | "INTO" -> INTO
    | "VALUES" -> VALUES
    | "UPDATE" -> UPDATE
    | "SET" -> SET
    | "DELETE" -> DELETE
    | "CREATE" -> CREATE
    | "TABLE" -> TABLE
    | "INDEX" -> INDEX
    | "BEGIN" -> BEGIN
    | "COMMIT" -> COMMIT
    | "ROLLBACK" -> ROLLBACK
    | "TRUE" -> BOOL true
    | "FALSE" -> BOOL false
    | "NULL" -> NULL
    | "JOIN" -> JOIN
    | "ON" -> ON
    | "GROUP" -> GROUP
    | "BY" -> BY
    | "ORDER" -> ORDER
    | "ASC" -> ASC
    | "DESC" -> DESC
    | "LIMIT" -> LIMIT
    | "OFFSET" -> OFFSET
    | "AS" -> AS
    | _ -> IDENT id
  }
  | '+' { PLUS }
  | '-' { MINUS }
  | '*' { STAR }
  | '/' { SLASH }
  | '=' { EQ }
  | '<' { LT }
  | '>' { GT }
  | "<=" { LE }
  | ">=" { GE }
  | "!=" | "<>" { NEQ }
  | '(' { LPAREN }
  | ')' { RPAREN }
  | ',' { COMMA }
  | ';' { SEMICOLON }
  | '.' { DOT }
  | eof { EOF }
  | _ as c { failwith (Printf.sprintf "意外字符: %c" c) }

AST 定义

(* query/ast.ml *)

type column = {
  name: string;
  table: string option;
  alias: string option;
}

type literal =
  | LFloat of float
  | LString of string
  | LBool of bool
  | LNull

type binary_op = Add | Sub | Mul | Div | Eq | Neq | Lt | Gt | Le | Ge | And | Or
type unary_op = Not | Neg

type expr =
  | EColumn of column
  | ELiteral of literal
  | EBinaryOp of binary_op * expr * expr
  | EUnaryOp of unary_op * expr
  | EFunc of string * expr list
  | EAgg of string * expr  (* COUNT, SUM, AVG 等 *)

type select_item =
  | Star
  | Expr of expr * string option  (* expression AS alias *)

type join_type = Inner | Left | Right | Full

type table_ref =
  | TableName of string * string option  (* name AS alias *)
  | Join of table_ref * table_ref * join_type * expr option

type order_item = {
  expr: expr;
  direction: [`Asc | `Desc];
}

type statement =
  | Select of {
      columns: select_item list;
      from: table_ref list;
      where: expr option;
      group_by: expr list;
      having: expr option;
      order_by: order_item list;
      limit: int option;
      offset: int option;
    }
  | Insert of {
      table: string;
      columns: string list;
      values: expr list list;
    }
  | Update of {
      table: string;
      set: (string * expr) list;
      where: expr option;
    }
  | Delete of {
      table: string;
      where: expr option;
    }
  | CreateTable of {
      name: string;
      columns: column_def list;
    }
  | CreateIndex of {
      name: string;
      table: string;
      columns: string list;
    }
  | Begin
  | Commit
  | Rollback

and column_def = {
  col_name: string;
  col_type: string;
  not_null: bool;
  primary_key: bool;
}

💡 提示:AST 设计要尽量贴近 SQL 语义,这样后续的优化和执行都会更自然。

语法分析

(* query/parser.mly *)
%{
  open Ast
%}

%token <float> NUM
%token <string> STRING
%token <string> IDENT
%token <bool> BOOL
%token NULL
%token SELECT FROM WHERE AND OR NOT
%token INSERT INTO VALUES UPDATE SET DELETE
%token CREATE TABLE INDEX BEGIN COMMIT ROLLBACK
%token JOIN ON GROUP BY ORDER ASC DESC LIMIT OFFSET AS
%token PLUS MINUS STAR SLASH
%token EQ LT GT LE GE NEQ
%token LPAREN RPAREN COMMA SEMICOLON DOT
%token EOF

%left OR
%left AND
%right NOT
%left EQ NEQ
%left LT GT LE GE
%left PLUS MINUS
%left STAR SLASH

%start <statement> statement

%%

statement:
  | s = select_stmt; SEMICOLON?; EOF { s }
  | s = insert_stmt; SEMICOLON?; EOF { s }
  | s = update_stmt; SEMICOLON?; EOF { s }
  | s = delete_stmt; SEMICOLON?; EOF { s }
  | s = create_stmt; SEMICOLON?; EOF { s }
  | BEGIN; SEMICOLON?; EOF { Begin }
  | COMMIT; SEMICOLON?; EOF { Commit }
  | ROLLBACK; SEMICOLON?; EOF { Rollback }

select_stmt:
  | SELECT; columns = select_list;
    FROM; from = table_list;
    where = where_clause?;
    group = group_clause?;
    having = having_clause?;
    order = order_clause?;
    limit = limit_clause?;
    offset = offset_clause?
    { Select { columns; from; where; group_by = Option.value group ~default:[];
               having; order_by = Option.value order ~default:[];
               limit; offset } }

select_list:
  | STAR { [Star] }
  | items = separated_list(COMMA, select_item) { items }

select_item:
  | e = expr; AS; alias = IDENT { Expr (e, Some alias) }
  | e = expr { Expr (e, None) }

table_list:
  | tables = separated_list(COMMA, table_ref) { tables }

table_ref:
  | name = IDENT; AS; alias = IDENT { TableName (name, Some alias) }
  | name = IDENT { TableName (name, None) }
  | t1 = table_ref; JOIN; t2 = table_ref; ON; cond = expr
    { Join (t1, t2, Inner, Some cond) }

where_clause:
  | WHERE; e = expr { e }

group_clause:
  | GROUP; BY; exprs = separated_list(COMMA, expr) { exprs }

having_clause:
  | HAVING; e = expr { e }

order_clause:
  | ORDER; BY; items = separated_list(COMMA, order_item) { items }

order_item:
  | e = expr; ASC { { expr = e; direction = `Asc } }
  | e = expr; DESC { { expr = e; direction = `Desc } }
  | e = expr { { expr = e; direction = `Asc } }

limit_clause:
  | LIMIT; n = NUM { int_of_float n }

offset_clause:
  | OFFSET; n = NUM { int_of_float n }

expr:
  | e = simple_expr { e }
  | left = expr; op = binop; right = expr { EBinaryOp (op, left, right) }
  | NOT; e = expr { EUnaryOp (Not, e) }
  | MINUS; e = expr %prec NOT { EUnaryOp (Neg, e) }

simple_expr:
  | n = NUM { ELiteral (LFloat n) }
  | s = STRING { ELiteral (LString s) }
  | b = BOOL { ELiteral (LBool b) }
  | NULL { ELiteral LNull }
  | id = IDENT { EColumn { name = id; table = None; alias = None } }
  | table = DOT; col = IDENT { EColumn { name = col; table = Some table; alias = None } }
  | func = IDENT; LPAREN; args = separated_list(COMMA, expr); RPAREN
    { EFunc (func, args) }
  | LPAREN; e = expr; RPAREN { e }

%inline binop:
  | EQ { Eq } | NEQ { Neq }
  | LT { Lt } | GT { Gt }
  | LE { Le } | GE { Ge }
  | PLUS { Add } | MINUS { Sub }
  | STAR { Mul } | SLASH { Div }
  | AND { And } | OR { Or }

insert_stmt:
  | INSERT; INTO; table = IDENT;
    LPAREN; columns = separated_list(COMMA, IDENT); RPAREN;
    VALUES; values = separated_list(COMMA, value_list)
    { Insert { table; columns; values } }

value_list:
  | LPAREN; values = separated_list(COMMA, expr); RPAREN { values }

update_stmt:
  | UPDATE; table = IDENT;
    SET; assignments = separated_list(COMMA, assignment);
    where = where_clause?
    { Update { table; set = assignments; where } }

assignment:
  | col = IDENT; EQ; e = expr { (col, e) }

delete_stmt:
  | DELETE; FROM; table = IDENT;
    where = where_clause?
    { Delete { table; where } }

create_stmt:
  | CREATE; TABLE; name = IDENT;
    LPAREN; columns = separated_list(COMMA, column_def); RPAREN
    { CreateTable { name; columns } }
  | CREATE; INDEX; name = IDENT;
    ON; table = IDENT;
    LPAREN; columns = separated_list(COMMA, IDENT); RPAREN
    { CreateIndex { name; table; columns } }

column_def:
  | name = IDENT; typ = IDENT; constraints = list(constraint_)
    { { col_name = name; col_type = typ;
        not_null = List.mem `NotNull constraints;
        primary_key = List.mem `PrimaryKey constraints } }

constraint_:
  | NOT; NULL { `NotNull }
  | PRIMARY; KEY { `PrimaryKey }

查询优化器

(* query/optimizer.ml *)
open Ast

(* 谓词下推 *)
let push_down_predicates stmt =
  match stmt with
  | Select { columns; from; where; group_by; having; order_by; limit; offset } ->
    let new_where = match where with
      | Some pred ->
        (* 将可以下推的谓词移动到对应的表 *)
        let pushed, remaining = partition_predicates pred from in
        (* 将 pushed 应用到 from 中的表 *)
        apply_pushed_predicates from pushed;
        remaining
      | None -> None
    in
    Select { columns; from; where = new_where; group_by; having; order_by; limit; offset }
  | _ -> stmt

(* 常量折叠 *)
let rec fold_constants = function
  | EBinaryOp (Add, ELiteral (LFloat a), ELiteral (LFloat b)) ->
    ELiteral (LFloat (a +. b))
  | EBinaryOp (Sub, ELiteral (LFloat a), ELiteral (LFloat b)) ->
    ELiteral (LFloat (a -. b))
  | EBinaryOp (Mul, ELiteral (LFloat a), ELiteral (LFloat b)) ->
    ELiteral (LFloat (a *. b))
  | EBinaryOp (Div, ELiteral (LFloat a), ELiteral (LFloat b)) when b <> 0.0 ->
    ELiteral (LFloat (a /. b))
  | EBinaryOp (Eq, ELiteral a, ELiteral b) ->
    ELiteral (LBool (a = b))
  | EBinaryOp (And, ELiteral (LBool a), ELiteral (LBool b)) ->
    ELiteral (LBool (a && b))
  | EBinaryOp (Or, ELiteral (LBool a), ELiteral (LBool b)) ->
    ELiteral (LBool (a || b))
  | EBinaryOp (op, left, right) ->
    EBinaryOp (op, fold_constants left, fold_constants right)
  | EUnaryOp (Not, ELiteral (LBool b)) ->
    ELiteral (LBool (not b))
  | EUnaryOp (Neg, ELiteral (LFloat n)) ->
    ELiteral (LFloat (-.n))
  | e -> e

(* 选择最优执行计划 *)
type execution_plan =
  | FullScan of string
  | IndexScan of string * string * expr
  | NestedLoopJoin of execution_plan * execution_plan * expr
  | HashJoin of execution_plan * execution_plan * string * string

let estimate_cost = function
  | FullScan table ->
    let stats = get_table_stats table in
    stats.num_rows  (* 全表扫描代价 = 行数 *)
  | IndexScan (table, index, _) ->
    let stats = get_table_stats table in
    stats.num_rows / 10  (* 索引扫描代价 ≈ 行数/10 *)
  | NestedLoopJoin (left, right, _) ->
    estimate_cost left * estimate_cost right
  | HashJoin (left, right, _, _) ->
    estimate_cost left + estimate_cost right

let choose_best_plan table where =
  let plans = [
    FullScan table;
    (* 检查是否有可用索引 *)
    (match find_usable_index table where with
     | Some (index, pred) -> IndexScan (table, index, pred)
     | None -> FullScan table);
  ] in
  List.fold_left (fun best plan ->
    if estimate_cost plan < estimate_cost best then plan else best
  ) (List.hd plans) (List.tl plans)

💡 提示:查询优化器的核心是估算代价。收集准确的统计信息(行数、列基数、直方图)对优化至关重要。

事务处理

(* transaction/mvcc.ml *)

(* 事务 ID *)
type txn_id = int

(* 版本链 *)
type version = {
  data: bytes;
  created_by: txn_id;
  deleted_by: txn_id option;
  mutable next: version option;
}

(* 事务状态 *)
type txn_status = Active | Committed | Aborted

type transaction = {
  id: txn_id;
  snapshot: txn_id list;  (* 活跃事务列表 *)
  mutable status: txn_status;
  mutable write_set: (string * bytes) list;  (* 写集合 *)
}

(* 全局事务管理器 *)
type mvcc = {
  mutable next_txn_id: txn_id;
  mutable active_txns: (txn_id, transaction) Hashtbl.t;
  mutable committed_txns: txn_id list;
}

let create_mvcc () = {
  next_txn_id = 1;
  active_txns = Hashtbl.create 16;
  committed_txns = [];
}

(* 开始事务 *)
let begin_txn mvcc =
  let id = mvcc.next_txn_id in
  mvcc.next_txn_id <- mvcc.next_txn_id + 1;
  let snapshot = Hashtbl.fold (fun tid _ acc -> tid :: acc) mvcc.active_txns [] in
  let txn = {
    id;
    snapshot;
    status = Active;
    write_set = [];
  } in
  Hashtbl.add mvcc.active_txns id txn;
  txn

(* 判断版本可见性 *)
let is_visible txn version =
  (* 版本由当前事务创建 → 可见 *)
  if version.created_by = txn.id then true
  (* 版本由活跃事务创建 → 不可见 *)
  else if List.mem version.created_by txn.snapshot then false
  (* 版本已删除且删除者已提交或在快照中 → 不可见 *)
  else match version.deleted_by with
    | Some deleter ->
      deleter <> txn.id &&
      not (List.mem deleter txn.snapshot)
    | None -> true

(* 读取 *)
let read mvcc txn key =
  let versions = find_versions key in
  let visible = List.filter (is_visible txn) versions in
  match visible with
  | [] -> None  (* 键不存在 *)
  | v :: _ -> Some v.data  (* 返回最新可见版本 *)

(* 写入(缓冲,提交时应用) *)
let write mvcc txn key value =
  txn.write_set <- (key, value) :: txn.write_set

(* 提交 *)
let commit mvcc txn =
  (* 检查写-写冲突 *)
  let has_conflict = List.exists (fun (key, _) ->
    Hashtbl.fold (fun _ other_txn acc ->
      acc || (other_txn.id <> txn.id &&
              other_txn.status = Active &&
              List.exists (fun (k, _) -> k = key) other_txn.write_set)
    ) mvcc.active_txns false
  ) txn.write_set in
  
  if has_conflict then begin
    txn.status <- Aborted;
    Error "写-写冲突,事务中止"
  end else begin
    (* 应用写集合 *)
    List.iter (fun (key, value) ->
      let old_version = find_latest_version key in
      let new_version = {
        data = value;
        created_by = txn.id;
        deleted_by = None;
        next = old_version;
      } in
      (match old_version with
       | Some v -> v.deleted_by <- Some txn.id
       | None -> ());
      add_version key new_version
    ) txn.write_set;
    
    txn.status <- Committed;
    Hashtbl.remove mvcc.active_txns txn.id;
    mvcc.committed_txns <- txn.id :: mvcc.committed_txns;
    Ok ()
  end

(* 回滚 *)
let rollback mvcc txn =
  txn.status <- Aborted;
  txn.write_set <- [];
  Hashtbl.remove mvcc.active_txns txn.id
MVCC 操作说明
begin_txn开始事务,获取快照
read读取对事务可见的版本
write缓冲写入
commit检查冲突并提交
rollback回滚事务

并发控制

(* transaction/lock.ml *)
type lock_type = Shared | Exclusive

type lock_entry = {
  mutable holders: (int * lock_type) list;  (* (txn_id, lock_type) *)
  mutable waiters: (int * lock_type * Condition.t) list;
}

type lock_manager = {
  locks: (string, lock_entry) Hashtbl.t;
  mutex: Mutex.t;
}

let create_lock_manager () = {
  locks = Hashtbl.create 64;
  mutex = Mutex.create ();
}

(* 获取锁 *)
let acquire_lock lm txn_id key lock_type =
  Mutex.lock lm.mutex;
  let entry = match Hashtbl.find_opt lm.locks key with
    | Some e -> e
    | None ->
      let e = { holders = []; waiters = [] } in
      Hashtbl.add lm.locks key e;
      e
  in
  
  (* 检查是否可以获取锁 *)
  let can_acquire () =
    match lock_type with
    | Shared ->
      (* 共享锁:没有排他锁持有者 *)
      not (List.exists (fun (_, lt) -> lt = Exclusive) entry.holders)
    | Exclusive ->
      (* 排他锁:没有其他持有者 *)
      entry.holders = []
  in
  
  if can_acquire () then begin
    entry.holders <- (txn_id, lock_type) :: entry.holders;
    Mutex.unlock lm.mutex;
    true
  end else begin
    (* 等待 *)
    let cond = Condition.create () in
    entry.waiters <- (txn_id, lock_type, cond) :: entry.waiters;
    Condition.wait cond lm.mutex;
    Mutex.unlock lm.mutex;
    true
  end

(* 释放锁 *)
let release_lock lm txn_id key =
  Mutex.lock lm.mutex;
  (match Hashtbl.find_opt lm.locks key with
   | Some entry ->
     entry.holders <- List.filter (fun (tid, _) -> tid <> txn_id) entry.holders;
     (* 唤醒等待者 *)
     List.iter (fun (_, _, cond) -> Condition.signal cond) entry.waiters;
     entry.waiters <- []
   | None -> ());
  Mutex.unlock lm.mutex

(* 释放事务的所有锁 *)
let release_all_locks lm txn_id =
  Hashtbl.iter (fun key entry ->
    if List.exists (fun (tid, _) -> tid = txn_id) entry.holders then
      release_lock lm txn_id key
  ) lm.locks

⚠️ 注意:锁管理要小心死锁检测。简单的策略是设置超时,复杂策略需要构建等待图并检测环。

命令行接口

(* bin/main.ml *)
open Cmdliner
open Minidb

let repl db =
  Printf.printf "MiniDB v0.1\n";
  Printf.printf "输入 SQL 语句或 'quit' 退出\n\n";
  
  let continue = ref true in
  while !continue do
    Printf.printf "minidb> ";
    flush stdout;
    
    try
      let line = input_line stdin in
      if String.trim line = "quit" then
        continue := false
      else begin
        match execute_sql db line with
        | Ok result ->
          print_result result
        | Error msg ->
          Printf.printf "错误: %s\n" msg
      end
    with
    | End_of_file -> continue := false
    | exn ->
      Printf.printf "错误: %s\n" (Printexc.to_string exn)
  done;
  Printf.printf "再见!\n"

let exec_file db filename =
  let ic = open_in filename in
  let content = really_input_string ic (in_channel_length ic) in
  close_in ic;
  match execute_sql db content with
  | Ok result -> print_result result
  | Error msg -> Printf.printf "错误: %s\n" msg

(* 命令行参数 *)
let db_path_arg =
  let doc = "数据库文件路径" in
  Arg.(value & opt string "minidb.dat" & info ["d"; "db"] ~doc)

let file_arg =
  let doc = "SQL 文件" in
  Arg.(value & pos 0 (some file) None & info [] ~doc)

let run db_path file =
  let db = Database.open_db db_path in
  match file with
  | Some f -> exec_file db f
  | None -> repl db

let cmd =
  let doc = "MiniDB - 小型关系型数据库" in
  let man = [
    `S Manpage.s_description;
    `P "MiniDB 是一个教学用的小型关系型数据库。";
    `S Manpage.s_examples;
    `Pre "  minidb";
    `Pre "  minidb script.sql";
    `Pre "  minidb -d mydb.dat";
  ] in
  let info = Cmd.info "minidb" ~doc ~man ~version:"0.1.0" in
  Cmd.v info Term.(const run $ db_path_arg $ file_arg)

let () = exit (Cmd.eval cmd)

测试策略

(* test/test_storage.ml *)
open Alcotest
open Minidb.Storage

let test_page_read_write () =
  let page = Page.create 0 in
  Page.write_int page 0 42;
  let result = Page.read_int page 0 in
  check int "读写一致" 42 result

let test_btree_insert_find () =
  let tree = Btree.create "test.db" in
  Btree.insert tree "key1" "value1";
  Btree.insert tree "key2" "value2";
  Btree.insert tree "key3" "value3";
  
  check (option string) "查找 key1" (Some "value1") (Btree.find tree "key1");
  check (option string) "查找 key2" (Some "value2") (Btree.find tree "key2");
  check (option string) "查找不存在" None (Btree.find tree "key99")

let test_btree_many_inserts () =
  let tree = Btree.create "test_large.db" in
  for i = 0 to 9999 do
    let key = Printf.sprintf "key_%05d" i in
    let value = Printf.sprintf "value_%d" i in
    Btree.insert tree key value
  done;
  for i = 0 to 9999 do
    let key = Printf.sprintf "key_%05d" i in
    let expected = Some (Printf.sprintf "value_%d" i) in
    check (option string) key expected (Btree.find tree key)
  done

let test_buffer_pool_lru () =
  let pool = Buffer.create_pool 10 "test.db" in
  (* 插入超过池大小的页 *)
  for i = 0 to 19 do
    let page = Buffer.get_page pool i in
    Page.write_int page 0 i
  done;
  (* 验证 LRU 淘汰 *)
  let page = Buffer.get_page pool 0 in
  check int "LRU 淘汰后仍可读" 0 (Page.read_int page 0)

let suite = [
  "页操作", [
    test_case "读写" `Quick test_page_read_write;
  ];
  "B-Tree", [
    test_case "插入查找" `Quick test_btree_insert_find;
    test_case "大量插入" `Slow test_btree_many_inserts;
  ];
  "缓冲池", [
    test_case "LRU" `Quick test_buffer_pool_lru;
  ];
]

(* test/test_query.ml *)
let test_parse_select () =
  let sql = "SELECT name, age FROM users WHERE age > 18" in
  match parse_sql sql with
  | Ok stmt ->
    (match stmt with
     | Select { columns; where; _ } ->
       check int "列数" 2 (List.length columns);
       check bool "有 WHERE" true (Option.is_some where)
     | _ -> fail "期望 SELECT")
  | Error msg -> fail msg

let test_parse_insert () =
  let sql = "INSERT INTO users (name, age) VALUES ('Alice', 30)" in
  match parse_sql sql with
  | Ok (Insert { table; columns; values }) ->
    check string "表名" "users" table;
    check int "列数" 2 (List.length columns)
  | _ -> fail "期望 INSERT"

(* test/test_transaction.ml *)
let test_mvcc_basic () =
  let mvcc = MVCC.create_mvcc () in
  let txn1 = MVCC.begin_txn mvcc in
  MVCC.write mvcc txn1 "key1" "value1";
  let result = MVCC.commit mvcc txn1 in
  check (result unit string) "提交成功" (Ok ()) result

let test_mvcc_conflict () =
  let mvcc = MVCC.create_mvcc () in
  let txn1 = MVCC.begin_txn mvcc in
  let txn2 = MVCC.begin_txn mvcc in
  MVCC.write mvcc txn1 "key1" "value1";
  MVCC.write mvcc txn2 "key1" "value2";
  let r1 = MVCC.commit mvcc txn1 in
  let r2 = MVCC.commit mvcc txn2 in
  check bool "一个成功" true (Result.is_ok r1 || Result.is_ok r2);
  check bool "一个失败" true (Result.is_error r1 || Result.is_error r2)

部署与打包

(* dune-project *)
(lang dune 3.0)

(name minidb)
(version 0.1.0)
(authors "Your Name")
(license "MIT")

(package
 (name minidb)
 (synopsis "A small relational database")
 (description "MiniDB is an educational relational database implementation in OCaml.")
 (depends
  (ocaml (>= 4.14))
  (dune (>= 3.0))
  (cmdliner (>= 1.1))
  (alcotest :with-test)))
# 构建
dune build

# 测试
dune test

# 发布到 opam
opam publish minidb.0.1.0 .

# 打包
dune build @install
tar czf minidb-0.1.0.tar.gz -C _build/install default

项目总结与展望

已实现可扩展
页管理索引类型(Hash/GIN/GiST)
B-Tree 索引复杂查询(子查询、CTE)
缓冲池并行查询执行
WAL分布式事务
SQL 解析存储过程
查询优化器视图
MVCC 事务触发器
锁管理外键约束
CLI 接口网络协议

💡 提示:这个项目涵盖了数据库系统的核心概念。进一步学习可以参考《Database Internals》和《Designing Data-Intensive Applications》。

扩展阅读


上一节编译器前端实践