OCaml 教程 / 完整项目:构建一个小型数据库
完整项目:构建一个小型数据库
本节将从零构建一个小型关系型数据库,涵盖存储引擎、查询解析、优化器、事务处理等核心组件。
项目架构设计
minidb/
├── lib/
│ ├── storage/ # 存储引擎
│ │ ├── page.ml # 页管理
│ │ ├── btree.ml # B-Tree 索引
│ │ ├── buffer.ml # 缓冲池
│ │ └── wal.ml # 预写日志
│ ├── query/ # 查询处理
│ │ ├── lexer.mll # 词法分析
│ │ ├── parser.mly # 语法分析
│ │ ├── ast.ml # 抽象语法树
│ │ ├── optimizer.ml # 查询优化器
│ │ └── executor.ml # 查询执行器
│ ├── transaction/ # 事务处理
│ │ ├── mvcc.ml # 多版本并发控制
│ │ └── lock.ml # 锁管理
│ └── common/ # 公共模块
│ ├── types.ml # 类型定义
│ ├── error.ml # 错误处理
│ └── config.ml # 配置
├── bin/ # 命令行接口
│ └── main.ml
├── test/ # 测试
│ ├── test_storage.ml
│ ├── test_query.ml
│ └── test_transaction.ml
├── dune-project
└── minidb.opam
| 层级 | 组件 | 职责 |
|---|---|---|
| 存储层 | Page/B-Tree/Buffer | 数据持久化与索引 |
| 查询层 | Parser/Optimizer/Executor | SQL 处理 |
| 事务层 | MVCC/Lock | 并发控制 |
| 接口层 | CLI/REPL | 用户交互 |
存储引擎
页管理
(* storage/page.ml *)
let page_size = 4096 (* 4KB 页大小 *)
type page = {
id: int;
data: Bytes.t;
mutable dirty: bool;
}
let create id = {
id;
data = Bytes.create page_size;
dirty = false;
}
(* 页内偏移读写 *)
let read_int page offset =
Bytes.get_int32_le page.data offset |> Int32.to_int
let write_int page offset value =
Bytes.set_int32_le page.data offset (Int32.of_int value);
page.dirty <- true
let read_bytes page offset length =
Bytes.sub page.data offset length
let write_bytes page offset data =
Bytes.blit data 0 page.data offset (Bytes.length data);
page.dirty <- true
let read_string page offset length =
Bytes.sub_string page.data offset length
let write_string page offset s =
Bytes.blit_string s 0 page.data offset (String.length s);
page.dirty <- true
(* 页头结构 *)
type page_header = {
page_type: int; (* 0=数据页, 1=索引页, 2=溢出页 *)
num_cells: int; (* 单元格数量 *)
free_offset: int; (* 空闲区域起始 *)
cell_offset: int; (* 单元格区域起始 *)
}
let read_header page =
{
page_type = read_int page 0;
num_cells = read_int page 4;
free_offset = read_int page 8;
cell_offset = read_int page 12;
}
let write_header page header =
write_int page 0 header.page_type;
write_int page 4 header.num_cells;
write_int page 8 header.free_offset;
write_int page 12 header.cell_offset
💡 提示:固定页大小(4KB)是数据库设计的常见选择,与操作系统页大小对齐,减少碎片。
B-Tree 索引
(* storage/btree.ml *)
open Page
type btree_node =
| Leaf of {
keys: string list;
values: bytes list;
next: int; (* 叶子链表 *)
}
| Internal of {
keys: string list;
children: int list;
}
type btree = {
fd: Unix.file_descr;
mutable root_page: int;
mutable num_pages: int;
}
let create filename =
let fd = Unix.openfile filename
[Unix.O_RDWR; Unix.O_CREAT; Unix.O_TRUNC] 0o644 in
let tree = { fd; root_page = 0; num_pages = 1 } in
(* 初始化根页 *)
let root = create 0 in
let header = { page_type = 1; num_cells = 0; free_offset = page_size; cell_offset = 16 } in
write_header root header;
(* 写入到文件 *)
Unix.lseek fd 0 Unix.SEEK_SET |> ignore;
Unix.write fd root.data 0 page_size |> ignore;
tree
(* 二分搜索 *)
let binary_search keys target =
let rec aux lo hi =
if lo >= hi then lo
else
let mid = lo + (hi - lo) / 2 in
let cmp = String.compare keys.(mid) target in
if cmp < 0 then aux (mid + 1) hi
else if cmp > 0 then aux lo mid
else mid
in
aux 0 (Array.length keys)
(* 插入 *)
let rec insert tree key value =
let root = read_page tree.fd tree.root_page in
let header = read_header root in
match header.page_type with
| 1 -> (* 根是叶子 *)
insert_into_leaf tree root key value
| 2 -> (* 根是内部节点 *)
let new_root = split_if_needed tree root key value in
if new_root <> tree.root_page then
tree.root_page <- new_root
| _ -> failwith "未知页类型"
and insert_into_leaf tree page key value =
let header = read_header page in
if header.num_cells < max_leaf_cells then begin
(* 直接插入 *)
let idx = find_insert_position page key in
shift_cells_right page idx;
write_cell page idx key value;
write_header page { header with num_cells = header.num_cells + 1 }
end else begin
(* 分裂 *)
split_leaf tree page key value
end
and split_leaf tree page key value =
let mid = max_leaf_cells / 2 in
let new_page_id = allocate_page tree in
let new_page = create new_page_id in
(* 将一半数据移动到新页 *)
(* ... *)
(* 更新父节点 *)
update_parent tree page.id new_page_id
(* 查找 *)
let find tree key =
let page = read_page tree.fd tree.root_page in
let header = read_header page in
match header.page_type with
| 1 -> find_in_leaf page key
| 2 -> find_in_internal tree page key
| _ -> None
and find_in_leaf page key =
let header = read_header page in
let keys = read_keys page header.num_cells in
let idx = binary_search (Array.of_list keys) key in
if idx < Array.length keys && keys.(idx) = key then
Some (read_value page idx)
else None
and find_in_internal tree page key =
let header = read_header page in
let keys = read_keys page header.num_cells in
let children = read_children page (header.num_cells + 1) in
let idx = binary_search (Array.of_list keys) key in
let child_page_id = List.nth children idx in
let child_page = read_page tree.fd child_page_id in
find tree child_page key
| B-Tree 操作 | 时间复杂度 | 说明 |
|---|---|---|
| 查找 | O(log n) | 二分搜索 |
| 插入 | O(log n) | 可能触发分裂 |
| 删除 | O(log n) | 可能触发合并 |
| 范围扫描 | O(log n + k) | 叶子链表 |
⚠️ 注意:B-Tree 分裂和合并是复杂的操作,需要仔细处理各种边界情况(空树、单节点、满节点等)。
缓冲池
(* storage/buffer.ml *)
type buffer_pool = {
pool_size: int;
pages: (int, page) Hashtbl.t;
mutable lru: int list; (* 最近使用的页 ID *)
fd: Unix.file_descr;
}
let create_pool pool_size fd = {
pool_size;
pages = Hashtbl.create pool_size;
lru = [];
fd;
}
(* 获取页(带 LRU 淘汰) *)
let get_page pool page_id =
match Hashtbl.find_opt pool.pages page_id with
| Some page ->
(* 更新 LRU *)
pool.lru <- page_id :: List.filter ((<>) page_id) pool.lru;
page
| None ->
(* 需要从磁盘读取 *)
if Hashtbl.length pool.pages >= pool.pool_size then begin
(* 淘汰最久未使用的页 *)
let victim_id = List.last pool.lru in
let victim = Hashtbl.find pool.pages victim_id in
if victim.dirty then
flush_page pool.fd victim;
Hashtbl.remove pool.pages victim_id;
pool.lru <- List.filter ((<>) victim_id) pool.lru
end;
(* 读取新页 *)
let page = read_from_disk pool.fd page_id in
Hashtbl.add pool.pages page_id page;
pool.lru <- page_id :: pool.lru;
page
(* 刷新脏页 *)
let flush_all pool =
Hashtbl.iter (fun _ page ->
if page.dirty then begin
flush_page pool.fd page;
page.dirty <- false
end
) pool.pages
(* 预取 *)
let prefetch pool page_ids =
List.iter (fun id ->
if not (Hashtbl.mem pool.pages id) then
ignore (get_page pool id)
) page_ids
💡 提示:缓冲池大小通常设为可用内存的 50-80%。太小会导致频繁磁盘 IO,太大会影响操作系统缓存。
预写日志(WAL)
(* storage/wal.ml *)
type wal_entry = {
lsn: int; (* 日志序列号 *)
txn_id: int; (* 事务 ID *)
page_id: int; (* 页 ID *)
offset: int; (* 页内偏移 *)
before: bytes; (* 修改前数据 *)
after: bytes; (* 修改后数据 *)
}
type wal = {
fd: Unix.file_descr;
mutable next_lsn: int;
mutable buffer: bytes list;
buffer_size: int;
}
let create_wal filename = {
fd = Unix.openfile filename
[Unix.O_RDWR; Unix.O_CREAT; Unix.O_APPEND] 0o644;
next_lsn = 0;
buffer = [];
buffer_size = 1024;
}
(* 写入日志 *)
let write_log wal entry =
let entry = { entry with lsn = wal.next_lsn } in
wal.next_lsn <- wal.next_lsn + 1;
wal.buffer <- entry :: wal.buffer;
if List.length wal.buffer >= wal.buffer_size then
flush_wal wal
(* 刷新到磁盘 *)
let flush_wal wal =
let entries = List.rev wal.buffer in
wal.buffer <- [];
List.iter (fun entry ->
let data = serialize_entry entry in
Unix.write fd (Bytes.of_string data) 0 (String.length data) |> ignore
) entries;
Unix.fsync wal.fd (* 确保持久化 *)
(* 恢复 *)
let recover wal =
let entries = read_all_entries wal.fd in
(* 重放未提交的事务 *)
List.iter (fun entry ->
(* 应用修改 *)
apply_entry entry
) entries
(* 检查点 *)
let checkpoint wal buffer_pool =
(* 1. 刷新所有脏页 *)
Buffer.flush_all buffer_pool;
(* 2. 刷新 WAL *)
flush_wal wal;
(* 3. 截断已检查点的日志 *)
Unix.ftruncate wal.fd 0;
wal.next_lsn <- 0
| WAL 操作 | 说明 | 何时执行 |
|---|---|---|
| write_log | 记录修改 | 每次写操作 |
| flush_wal | 持久化日志 | 事务提交时 |
| checkpoint | 截断日志 | 定期/日志过大 |
| recover | 恢复数据 | 数据库启动时 |
查询解析器
词法分析
(* query/lexer.mll *)
{
open Parser
}
let digit = ['0'-'9']
let alpha = ['a'-'z' 'A'-'Z']
let ident = alpha (alpha | digit | '_')*
let whitespace = [' ' '\t' '\n' '\r']
let string_literal = '\'' [^ '\'']* '\''
rule token = parse
| whitespace+ { token lexbuf }
| "--" [^ '\n']* { token lexbuf }
| digit+ as num { NUM (float_of_string num) }
| string_literal as s { STRING (String.sub s 1 (String.length s - 2)) }
| ident as id {
match String.uppercase_ascii id with
| "SELECT" -> SELECT
| "FROM" -> FROM
| "WHERE" -> WHERE
| "AND" -> AND
| "OR" -> OR
| "NOT" -> NOT
| "INSERT" -> INSERT
| "INTO" -> INTO
| "VALUES" -> VALUES
| "UPDATE" -> UPDATE
| "SET" -> SET
| "DELETE" -> DELETE
| "CREATE" -> CREATE
| "TABLE" -> TABLE
| "INDEX" -> INDEX
| "BEGIN" -> BEGIN
| "COMMIT" -> COMMIT
| "ROLLBACK" -> ROLLBACK
| "TRUE" -> BOOL true
| "FALSE" -> BOOL false
| "NULL" -> NULL
| "JOIN" -> JOIN
| "ON" -> ON
| "GROUP" -> GROUP
| "BY" -> BY
| "ORDER" -> ORDER
| "ASC" -> ASC
| "DESC" -> DESC
| "LIMIT" -> LIMIT
| "OFFSET" -> OFFSET
| "AS" -> AS
| _ -> IDENT id
}
| '+' { PLUS }
| '-' { MINUS }
| '*' { STAR }
| '/' { SLASH }
| '=' { EQ }
| '<' { LT }
| '>' { GT }
| "<=" { LE }
| ">=" { GE }
| "!=" | "<>" { NEQ }
| '(' { LPAREN }
| ')' { RPAREN }
| ',' { COMMA }
| ';' { SEMICOLON }
| '.' { DOT }
| eof { EOF }
| _ as c { failwith (Printf.sprintf "意外字符: %c" c) }
AST 定义
(* query/ast.ml *)
type column = {
name: string;
table: string option;
alias: string option;
}
type literal =
| LFloat of float
| LString of string
| LBool of bool
| LNull
type binary_op = Add | Sub | Mul | Div | Eq | Neq | Lt | Gt | Le | Ge | And | Or
type unary_op = Not | Neg
type expr =
| EColumn of column
| ELiteral of literal
| EBinaryOp of binary_op * expr * expr
| EUnaryOp of unary_op * expr
| EFunc of string * expr list
| EAgg of string * expr (* COUNT, SUM, AVG 等 *)
type select_item =
| Star
| Expr of expr * string option (* expression AS alias *)
type join_type = Inner | Left | Right | Full
type table_ref =
| TableName of string * string option (* name AS alias *)
| Join of table_ref * table_ref * join_type * expr option
type order_item = {
expr: expr;
direction: [`Asc | `Desc];
}
type statement =
| Select of {
columns: select_item list;
from: table_ref list;
where: expr option;
group_by: expr list;
having: expr option;
order_by: order_item list;
limit: int option;
offset: int option;
}
| Insert of {
table: string;
columns: string list;
values: expr list list;
}
| Update of {
table: string;
set: (string * expr) list;
where: expr option;
}
| Delete of {
table: string;
where: expr option;
}
| CreateTable of {
name: string;
columns: column_def list;
}
| CreateIndex of {
name: string;
table: string;
columns: string list;
}
| Begin
| Commit
| Rollback
and column_def = {
col_name: string;
col_type: string;
not_null: bool;
primary_key: bool;
}
💡 提示:AST 设计要尽量贴近 SQL 语义,这样后续的优化和执行都会更自然。
语法分析
(* query/parser.mly *)
%{
open Ast
%}
%token <float> NUM
%token <string> STRING
%token <string> IDENT
%token <bool> BOOL
%token NULL
%token SELECT FROM WHERE AND OR NOT
%token INSERT INTO VALUES UPDATE SET DELETE
%token CREATE TABLE INDEX BEGIN COMMIT ROLLBACK
%token JOIN ON GROUP BY ORDER ASC DESC LIMIT OFFSET AS
%token PLUS MINUS STAR SLASH
%token EQ LT GT LE GE NEQ
%token LPAREN RPAREN COMMA SEMICOLON DOT
%token EOF
%left OR
%left AND
%right NOT
%left EQ NEQ
%left LT GT LE GE
%left PLUS MINUS
%left STAR SLASH
%start <statement> statement
%%
statement:
| s = select_stmt; SEMICOLON?; EOF { s }
| s = insert_stmt; SEMICOLON?; EOF { s }
| s = update_stmt; SEMICOLON?; EOF { s }
| s = delete_stmt; SEMICOLON?; EOF { s }
| s = create_stmt; SEMICOLON?; EOF { s }
| BEGIN; SEMICOLON?; EOF { Begin }
| COMMIT; SEMICOLON?; EOF { Commit }
| ROLLBACK; SEMICOLON?; EOF { Rollback }
select_stmt:
| SELECT; columns = select_list;
FROM; from = table_list;
where = where_clause?;
group = group_clause?;
having = having_clause?;
order = order_clause?;
limit = limit_clause?;
offset = offset_clause?
{ Select { columns; from; where; group_by = Option.value group ~default:[];
having; order_by = Option.value order ~default:[];
limit; offset } }
select_list:
| STAR { [Star] }
| items = separated_list(COMMA, select_item) { items }
select_item:
| e = expr; AS; alias = IDENT { Expr (e, Some alias) }
| e = expr { Expr (e, None) }
table_list:
| tables = separated_list(COMMA, table_ref) { tables }
table_ref:
| name = IDENT; AS; alias = IDENT { TableName (name, Some alias) }
| name = IDENT { TableName (name, None) }
| t1 = table_ref; JOIN; t2 = table_ref; ON; cond = expr
{ Join (t1, t2, Inner, Some cond) }
where_clause:
| WHERE; e = expr { e }
group_clause:
| GROUP; BY; exprs = separated_list(COMMA, expr) { exprs }
having_clause:
| HAVING; e = expr { e }
order_clause:
| ORDER; BY; items = separated_list(COMMA, order_item) { items }
order_item:
| e = expr; ASC { { expr = e; direction = `Asc } }
| e = expr; DESC { { expr = e; direction = `Desc } }
| e = expr { { expr = e; direction = `Asc } }
limit_clause:
| LIMIT; n = NUM { int_of_float n }
offset_clause:
| OFFSET; n = NUM { int_of_float n }
expr:
| e = simple_expr { e }
| left = expr; op = binop; right = expr { EBinaryOp (op, left, right) }
| NOT; e = expr { EUnaryOp (Not, e) }
| MINUS; e = expr %prec NOT { EUnaryOp (Neg, e) }
simple_expr:
| n = NUM { ELiteral (LFloat n) }
| s = STRING { ELiteral (LString s) }
| b = BOOL { ELiteral (LBool b) }
| NULL { ELiteral LNull }
| id = IDENT { EColumn { name = id; table = None; alias = None } }
| table = DOT; col = IDENT { EColumn { name = col; table = Some table; alias = None } }
| func = IDENT; LPAREN; args = separated_list(COMMA, expr); RPAREN
{ EFunc (func, args) }
| LPAREN; e = expr; RPAREN { e }
%inline binop:
| EQ { Eq } | NEQ { Neq }
| LT { Lt } | GT { Gt }
| LE { Le } | GE { Ge }
| PLUS { Add } | MINUS { Sub }
| STAR { Mul } | SLASH { Div }
| AND { And } | OR { Or }
insert_stmt:
| INSERT; INTO; table = IDENT;
LPAREN; columns = separated_list(COMMA, IDENT); RPAREN;
VALUES; values = separated_list(COMMA, value_list)
{ Insert { table; columns; values } }
value_list:
| LPAREN; values = separated_list(COMMA, expr); RPAREN { values }
update_stmt:
| UPDATE; table = IDENT;
SET; assignments = separated_list(COMMA, assignment);
where = where_clause?
{ Update { table; set = assignments; where } }
assignment:
| col = IDENT; EQ; e = expr { (col, e) }
delete_stmt:
| DELETE; FROM; table = IDENT;
where = where_clause?
{ Delete { table; where } }
create_stmt:
| CREATE; TABLE; name = IDENT;
LPAREN; columns = separated_list(COMMA, column_def); RPAREN
{ CreateTable { name; columns } }
| CREATE; INDEX; name = IDENT;
ON; table = IDENT;
LPAREN; columns = separated_list(COMMA, IDENT); RPAREN
{ CreateIndex { name; table; columns } }
column_def:
| name = IDENT; typ = IDENT; constraints = list(constraint_)
{ { col_name = name; col_type = typ;
not_null = List.mem `NotNull constraints;
primary_key = List.mem `PrimaryKey constraints } }
constraint_:
| NOT; NULL { `NotNull }
| PRIMARY; KEY { `PrimaryKey }
查询优化器
(* query/optimizer.ml *)
open Ast
(* 谓词下推 *)
let push_down_predicates stmt =
match stmt with
| Select { columns; from; where; group_by; having; order_by; limit; offset } ->
let new_where = match where with
| Some pred ->
(* 将可以下推的谓词移动到对应的表 *)
let pushed, remaining = partition_predicates pred from in
(* 将 pushed 应用到 from 中的表 *)
apply_pushed_predicates from pushed;
remaining
| None -> None
in
Select { columns; from; where = new_where; group_by; having; order_by; limit; offset }
| _ -> stmt
(* 常量折叠 *)
let rec fold_constants = function
| EBinaryOp (Add, ELiteral (LFloat a), ELiteral (LFloat b)) ->
ELiteral (LFloat (a +. b))
| EBinaryOp (Sub, ELiteral (LFloat a), ELiteral (LFloat b)) ->
ELiteral (LFloat (a -. b))
| EBinaryOp (Mul, ELiteral (LFloat a), ELiteral (LFloat b)) ->
ELiteral (LFloat (a *. b))
| EBinaryOp (Div, ELiteral (LFloat a), ELiteral (LFloat b)) when b <> 0.0 ->
ELiteral (LFloat (a /. b))
| EBinaryOp (Eq, ELiteral a, ELiteral b) ->
ELiteral (LBool (a = b))
| EBinaryOp (And, ELiteral (LBool a), ELiteral (LBool b)) ->
ELiteral (LBool (a && b))
| EBinaryOp (Or, ELiteral (LBool a), ELiteral (LBool b)) ->
ELiteral (LBool (a || b))
| EBinaryOp (op, left, right) ->
EBinaryOp (op, fold_constants left, fold_constants right)
| EUnaryOp (Not, ELiteral (LBool b)) ->
ELiteral (LBool (not b))
| EUnaryOp (Neg, ELiteral (LFloat n)) ->
ELiteral (LFloat (-.n))
| e -> e
(* 选择最优执行计划 *)
type execution_plan =
| FullScan of string
| IndexScan of string * string * expr
| NestedLoopJoin of execution_plan * execution_plan * expr
| HashJoin of execution_plan * execution_plan * string * string
let estimate_cost = function
| FullScan table ->
let stats = get_table_stats table in
stats.num_rows (* 全表扫描代价 = 行数 *)
| IndexScan (table, index, _) ->
let stats = get_table_stats table in
stats.num_rows / 10 (* 索引扫描代价 ≈ 行数/10 *)
| NestedLoopJoin (left, right, _) ->
estimate_cost left * estimate_cost right
| HashJoin (left, right, _, _) ->
estimate_cost left + estimate_cost right
let choose_best_plan table where =
let plans = [
FullScan table;
(* 检查是否有可用索引 *)
(match find_usable_index table where with
| Some (index, pred) -> IndexScan (table, index, pred)
| None -> FullScan table);
] in
List.fold_left (fun best plan ->
if estimate_cost plan < estimate_cost best then plan else best
) (List.hd plans) (List.tl plans)
💡 提示:查询优化器的核心是估算代价。收集准确的统计信息(行数、列基数、直方图)对优化至关重要。
事务处理
(* transaction/mvcc.ml *)
(* 事务 ID *)
type txn_id = int
(* 版本链 *)
type version = {
data: bytes;
created_by: txn_id;
deleted_by: txn_id option;
mutable next: version option;
}
(* 事务状态 *)
type txn_status = Active | Committed | Aborted
type transaction = {
id: txn_id;
snapshot: txn_id list; (* 活跃事务列表 *)
mutable status: txn_status;
mutable write_set: (string * bytes) list; (* 写集合 *)
}
(* 全局事务管理器 *)
type mvcc = {
mutable next_txn_id: txn_id;
mutable active_txns: (txn_id, transaction) Hashtbl.t;
mutable committed_txns: txn_id list;
}
let create_mvcc () = {
next_txn_id = 1;
active_txns = Hashtbl.create 16;
committed_txns = [];
}
(* 开始事务 *)
let begin_txn mvcc =
let id = mvcc.next_txn_id in
mvcc.next_txn_id <- mvcc.next_txn_id + 1;
let snapshot = Hashtbl.fold (fun tid _ acc -> tid :: acc) mvcc.active_txns [] in
let txn = {
id;
snapshot;
status = Active;
write_set = [];
} in
Hashtbl.add mvcc.active_txns id txn;
txn
(* 判断版本可见性 *)
let is_visible txn version =
(* 版本由当前事务创建 → 可见 *)
if version.created_by = txn.id then true
(* 版本由活跃事务创建 → 不可见 *)
else if List.mem version.created_by txn.snapshot then false
(* 版本已删除且删除者已提交或在快照中 → 不可见 *)
else match version.deleted_by with
| Some deleter ->
deleter <> txn.id &&
not (List.mem deleter txn.snapshot)
| None -> true
(* 读取 *)
let read mvcc txn key =
let versions = find_versions key in
let visible = List.filter (is_visible txn) versions in
match visible with
| [] -> None (* 键不存在 *)
| v :: _ -> Some v.data (* 返回最新可见版本 *)
(* 写入(缓冲,提交时应用) *)
let write mvcc txn key value =
txn.write_set <- (key, value) :: txn.write_set
(* 提交 *)
let commit mvcc txn =
(* 检查写-写冲突 *)
let has_conflict = List.exists (fun (key, _) ->
Hashtbl.fold (fun _ other_txn acc ->
acc || (other_txn.id <> txn.id &&
other_txn.status = Active &&
List.exists (fun (k, _) -> k = key) other_txn.write_set)
) mvcc.active_txns false
) txn.write_set in
if has_conflict then begin
txn.status <- Aborted;
Error "写-写冲突,事务中止"
end else begin
(* 应用写集合 *)
List.iter (fun (key, value) ->
let old_version = find_latest_version key in
let new_version = {
data = value;
created_by = txn.id;
deleted_by = None;
next = old_version;
} in
(match old_version with
| Some v -> v.deleted_by <- Some txn.id
| None -> ());
add_version key new_version
) txn.write_set;
txn.status <- Committed;
Hashtbl.remove mvcc.active_txns txn.id;
mvcc.committed_txns <- txn.id :: mvcc.committed_txns;
Ok ()
end
(* 回滚 *)
let rollback mvcc txn =
txn.status <- Aborted;
txn.write_set <- [];
Hashtbl.remove mvcc.active_txns txn.id
| MVCC 操作 | 说明 |
|---|---|
| begin_txn | 开始事务,获取快照 |
| read | 读取对事务可见的版本 |
| write | 缓冲写入 |
| commit | 检查冲突并提交 |
| rollback | 回滚事务 |
并发控制
(* transaction/lock.ml *)
type lock_type = Shared | Exclusive
type lock_entry = {
mutable holders: (int * lock_type) list; (* (txn_id, lock_type) *)
mutable waiters: (int * lock_type * Condition.t) list;
}
type lock_manager = {
locks: (string, lock_entry) Hashtbl.t;
mutex: Mutex.t;
}
let create_lock_manager () = {
locks = Hashtbl.create 64;
mutex = Mutex.create ();
}
(* 获取锁 *)
let acquire_lock lm txn_id key lock_type =
Mutex.lock lm.mutex;
let entry = match Hashtbl.find_opt lm.locks key with
| Some e -> e
| None ->
let e = { holders = []; waiters = [] } in
Hashtbl.add lm.locks key e;
e
in
(* 检查是否可以获取锁 *)
let can_acquire () =
match lock_type with
| Shared ->
(* 共享锁:没有排他锁持有者 *)
not (List.exists (fun (_, lt) -> lt = Exclusive) entry.holders)
| Exclusive ->
(* 排他锁:没有其他持有者 *)
entry.holders = []
in
if can_acquire () then begin
entry.holders <- (txn_id, lock_type) :: entry.holders;
Mutex.unlock lm.mutex;
true
end else begin
(* 等待 *)
let cond = Condition.create () in
entry.waiters <- (txn_id, lock_type, cond) :: entry.waiters;
Condition.wait cond lm.mutex;
Mutex.unlock lm.mutex;
true
end
(* 释放锁 *)
let release_lock lm txn_id key =
Mutex.lock lm.mutex;
(match Hashtbl.find_opt lm.locks key with
| Some entry ->
entry.holders <- List.filter (fun (tid, _) -> tid <> txn_id) entry.holders;
(* 唤醒等待者 *)
List.iter (fun (_, _, cond) -> Condition.signal cond) entry.waiters;
entry.waiters <- []
| None -> ());
Mutex.unlock lm.mutex
(* 释放事务的所有锁 *)
let release_all_locks lm txn_id =
Hashtbl.iter (fun key entry ->
if List.exists (fun (tid, _) -> tid = txn_id) entry.holders then
release_lock lm txn_id key
) lm.locks
⚠️ 注意:锁管理要小心死锁检测。简单的策略是设置超时,复杂策略需要构建等待图并检测环。
命令行接口
(* bin/main.ml *)
open Cmdliner
open Minidb
let repl db =
Printf.printf "MiniDB v0.1\n";
Printf.printf "输入 SQL 语句或 'quit' 退出\n\n";
let continue = ref true in
while !continue do
Printf.printf "minidb> ";
flush stdout;
try
let line = input_line stdin in
if String.trim line = "quit" then
continue := false
else begin
match execute_sql db line with
| Ok result ->
print_result result
| Error msg ->
Printf.printf "错误: %s\n" msg
end
with
| End_of_file -> continue := false
| exn ->
Printf.printf "错误: %s\n" (Printexc.to_string exn)
done;
Printf.printf "再见!\n"
let exec_file db filename =
let ic = open_in filename in
let content = really_input_string ic (in_channel_length ic) in
close_in ic;
match execute_sql db content with
| Ok result -> print_result result
| Error msg -> Printf.printf "错误: %s\n" msg
(* 命令行参数 *)
let db_path_arg =
let doc = "数据库文件路径" in
Arg.(value & opt string "minidb.dat" & info ["d"; "db"] ~doc)
let file_arg =
let doc = "SQL 文件" in
Arg.(value & pos 0 (some file) None & info [] ~doc)
let run db_path file =
let db = Database.open_db db_path in
match file with
| Some f -> exec_file db f
| None -> repl db
let cmd =
let doc = "MiniDB - 小型关系型数据库" in
let man = [
`S Manpage.s_description;
`P "MiniDB 是一个教学用的小型关系型数据库。";
`S Manpage.s_examples;
`Pre " minidb";
`Pre " minidb script.sql";
`Pre " minidb -d mydb.dat";
] in
let info = Cmd.info "minidb" ~doc ~man ~version:"0.1.0" in
Cmd.v info Term.(const run $ db_path_arg $ file_arg)
let () = exit (Cmd.eval cmd)
测试策略
(* test/test_storage.ml *)
open Alcotest
open Minidb.Storage
let test_page_read_write () =
let page = Page.create 0 in
Page.write_int page 0 42;
let result = Page.read_int page 0 in
check int "读写一致" 42 result
let test_btree_insert_find () =
let tree = Btree.create "test.db" in
Btree.insert tree "key1" "value1";
Btree.insert tree "key2" "value2";
Btree.insert tree "key3" "value3";
check (option string) "查找 key1" (Some "value1") (Btree.find tree "key1");
check (option string) "查找 key2" (Some "value2") (Btree.find tree "key2");
check (option string) "查找不存在" None (Btree.find tree "key99")
let test_btree_many_inserts () =
let tree = Btree.create "test_large.db" in
for i = 0 to 9999 do
let key = Printf.sprintf "key_%05d" i in
let value = Printf.sprintf "value_%d" i in
Btree.insert tree key value
done;
for i = 0 to 9999 do
let key = Printf.sprintf "key_%05d" i in
let expected = Some (Printf.sprintf "value_%d" i) in
check (option string) key expected (Btree.find tree key)
done
let test_buffer_pool_lru () =
let pool = Buffer.create_pool 10 "test.db" in
(* 插入超过池大小的页 *)
for i = 0 to 19 do
let page = Buffer.get_page pool i in
Page.write_int page 0 i
done;
(* 验证 LRU 淘汰 *)
let page = Buffer.get_page pool 0 in
check int "LRU 淘汰后仍可读" 0 (Page.read_int page 0)
let suite = [
"页操作", [
test_case "读写" `Quick test_page_read_write;
];
"B-Tree", [
test_case "插入查找" `Quick test_btree_insert_find;
test_case "大量插入" `Slow test_btree_many_inserts;
];
"缓冲池", [
test_case "LRU" `Quick test_buffer_pool_lru;
];
]
(* test/test_query.ml *)
let test_parse_select () =
let sql = "SELECT name, age FROM users WHERE age > 18" in
match parse_sql sql with
| Ok stmt ->
(match stmt with
| Select { columns; where; _ } ->
check int "列数" 2 (List.length columns);
check bool "有 WHERE" true (Option.is_some where)
| _ -> fail "期望 SELECT")
| Error msg -> fail msg
let test_parse_insert () =
let sql = "INSERT INTO users (name, age) VALUES ('Alice', 30)" in
match parse_sql sql with
| Ok (Insert { table; columns; values }) ->
check string "表名" "users" table;
check int "列数" 2 (List.length columns)
| _ -> fail "期望 INSERT"
(* test/test_transaction.ml *)
let test_mvcc_basic () =
let mvcc = MVCC.create_mvcc () in
let txn1 = MVCC.begin_txn mvcc in
MVCC.write mvcc txn1 "key1" "value1";
let result = MVCC.commit mvcc txn1 in
check (result unit string) "提交成功" (Ok ()) result
let test_mvcc_conflict () =
let mvcc = MVCC.create_mvcc () in
let txn1 = MVCC.begin_txn mvcc in
let txn2 = MVCC.begin_txn mvcc in
MVCC.write mvcc txn1 "key1" "value1";
MVCC.write mvcc txn2 "key1" "value2";
let r1 = MVCC.commit mvcc txn1 in
let r2 = MVCC.commit mvcc txn2 in
check bool "一个成功" true (Result.is_ok r1 || Result.is_ok r2);
check bool "一个失败" true (Result.is_error r1 || Result.is_error r2)
部署与打包
(* dune-project *)
(lang dune 3.0)
(name minidb)
(version 0.1.0)
(authors "Your Name")
(license "MIT")
(package
(name minidb)
(synopsis "A small relational database")
(description "MiniDB is an educational relational database implementation in OCaml.")
(depends
(ocaml (>= 4.14))
(dune (>= 3.0))
(cmdliner (>= 1.1))
(alcotest :with-test)))
# 构建
dune build
# 测试
dune test
# 发布到 opam
opam publish minidb.0.1.0 .
# 打包
dune build @install
tar czf minidb-0.1.0.tar.gz -C _build/install default
项目总结与展望
| 已实现 | 可扩展 |
|---|---|
| 页管理 | 索引类型(Hash/GIN/GiST) |
| B-Tree 索引 | 复杂查询(子查询、CTE) |
| 缓冲池 | 并行查询执行 |
| WAL | 分布式事务 |
| SQL 解析 | 存储过程 |
| 查询优化器 | 视图 |
| MVCC 事务 | 触发器 |
| 锁管理 | 外键约束 |
| CLI 接口 | 网络协议 |
💡 提示:这个项目涵盖了数据库系统的核心概念。进一步学习可以参考《Database Internals》和《Designing Data-Intensive Applications》。
扩展阅读
上一节:编译器前端实践