OCaml 教程 / 多核并行 Domains(OCaml 5)
多核并行 Domains(OCaml 5)
OCaml 5 引入了真正的多核并行支持。Domain 模块允许创建独立的执行域(domain),每个域运行在独立的操作系统线程上,拥有独立的 minor heap,共享 major heap。这是 OCaml 历史性的突破。
Domain 基础
概念模型
OCaml 5 运行时
├── Domain 0 (主域)
│ ├── Minor Heap (独立)
│ ├── 大部分 OCaml 运行时
│ └── OS Thread
├── Domain 1
│ ├── Minor Heap (独立)
│ └── OS Thread
└── Domain 2
├── Minor Heap (独立)
└── OS Thread
共享:
├── Major Heap
├── GC (协作式)
└── 全局数据结构
基本使用
(* 创建并启动一个新 domain *)
let () =
let handler = Domain.spawn (fun () ->
Printf.printf "Hello from domain %d!\n" (Domain.self () :> int);
42
) in
let result = Domain.join handler in
Printf.printf "Result: %d\n" result
Domain 模块 API
| 函数 | 类型 | 说明 |
|---|---|---|
Domain.spawn | ('a -> 'b) -> 'b Domain.t | 创建新 domain 执行函数 |
Domain.join | 'a Domain.t -> 'a | 等待 domain 完成并获取结果 |
Domain.self | unit -> Domain.id | 获取当前 domain ID |
Domain.cpu_relax | unit -> unit | CPU 让步(自旋等待时使用) |
⚠️ 注意:Domain.spawn 会创建新的 OS 线程,有显著开销。不要为轻量任务频繁创建 domain。使用域池或任务调度器。
Atomic:原子操作
Atomic 提供无锁的原子读写操作,用于 domain 间共享可变状态:
(* 创建原子值 *)
let counter = Atomic.make 0
(* 原子操作 *)
let () =
Atomic.incr counter; (* 原子递增 *)
Atomic.decr counter; (* 原子递减 *)
let old = Atomic.fetch_and_add counter 5 in (* 原子加法,返回旧值 *)
Printf.printf "Old: %d, New: %d\n" old (Atomic.get counter);
Atomic.set counter 100; (* 原子设置 *)
let v = Atomic.get counter in (* 原子读取 *)
Printf.printf "Current: %d\n" v
(* CAS(Compare-And-Swap)操作 *)
let rec increment counter =
let old = Atomic.get counter in
let new_val = old + 1 in
if Atomic.compare_and_set counter old new_val then
new_val (* 成功 *)
else
increment counter (* 失败,重试 *)
(* 带 CAS 的无锁栈 *)
type 'a stack = 'a list Atomic.t
let create_stack () : 'a stack = Atomic.make []
let push stack item =
let rec loop () =
let old = Atomic.get stack in
if Atomic.compare_and_set stack old (item :: old) then ()
else loop ()
in
loop ()
let pop stack =
let rec loop () =
match Atomic.get stack with
| [] -> None
| hd :: tl as old ->
if Atomic.compare_and_set stack old tl then Some hd
else loop ()
in
loop ()
Atomic 操作速查表
| 操作 | 说明 |
|---|---|
Atomic.make v | 创建原子值 |
Atomic.get a | 原子读取 |
Atomic.set a v | 原子写入 |
Atomic.exchange a v | 原子交换,返回旧值 |
Atomic.compare_and_set a old new | CAS 操作 |
Atomic.fetch_and_add a n | 原子加法,返回旧值 |
Atomic.incr a | 原子递增 |
Atomic.decr a | 原子递减 |
⚠️ 注意:compare_and_set 只能保护单个值的原子性。如果需要保护多个相关值的原子修改,使用 Mutex。
Mutex:互斥锁
Mutex 保护临界区,确保同一时刻只有一个 domain 执行临界代码:
(* 使用 Mutex 保护共享数据 *)
let shared_counter = ref 0
let mutex = Mutex.create ()
let increment () =
Mutex.lock mutex;
shared_counter := !shared_counter + 1;
Mutex.unlock mutex
let get_counter () =
Mutex.lock mutex;
let v = !shared_counter in
Mutex.unlock mutex;
v
(* 更安全的使用方式:with_lock *)
let safe_increment () =
Mutex.protect mutex (fun () ->
shared_counter := !shared_counter + 1
)
(* 并行递增示例 *)
let () =
let domains = Array.init 4 (fun _ ->
Domain.spawn (fun () ->
for _ = 1 to 10000 do
safe_increment ()
done
)
) in
Array.iter Domain.join domains;
Printf.printf "Final count: %d\n" (get_counter ())
(* 输出: Final count: 40000 *)
Mutex API
| 函数 | 说明 |
|---|---|
Mutex.create | 创建互斥锁 |
Mutex.lock | 获取锁(阻塞) |
Mutex.unlock | 释放锁 |
Mutex.protect m f | 获取锁 → 执行 f → 释放锁(异常安全) |
Mutex.try_lock | 尝试获取锁(非阻塞) |
💡 提示:总是使用 Mutex.protect 而非手动 lock/unlock,确保异常安全。
Condition:条件变量
条件变量用于 domain 间的等待/通知机制:
(* 生产者-消费者队列 *)
type 'a bounded_queue = {
mutex : Mutex.t;
not_empty : Condition.t;
not_full : Condition.t;
queue : 'a Queue.t;
capacity : int;
}
let create_queue capacity = {
mutex = Mutex.create ();
not_empty = Condition.create ();
not_full = Condition.create ();
queue = Queue.create ();
capacity;
}
let put q item =
Mutex.protect q.mutex (fun () ->
while Queue.length q.queue >= q.capacity do
Condition.wait q.not_full q.mutex
done;
Queue.push item q.queue;
Condition.signal q.not_empty
)
let take q =
Mutex.protect q.mutex (fun () ->
while Queue.is_empty q.queue do
Condition.wait q.not_empty q.mutex
done;
let item = Queue.pop q.queue in
Condition.signal q.not_full;
item
)
(* 使用示例 *)
let () =
let q = create_queue 10 in
let producer = Domain.spawn (fun () ->
for i = 1 to 20 do
put q i;
Printf.printf "Produced: %d\n" i
done
) in
let consumer = Domain.spawn (fun () ->
for _ = 1 to 20 do
let v = take q in
Printf.printf "Consumed: %d\n" v
done
) in
Domain.join producer;
Domain.join consumer
Condition API
| 函数 | 说明 |
|---|---|
Condition.create | 创建条件变量 |
Condition.wait c m | 释放锁 m,等待信号,重新获取锁 |
Condition.signal c | 唤醒一个等待者 |
Condition.broadcast c | 唤醒所有等待者 |
⚠️ 注意:Condition.wait 可能被虚假唤醒(spurious wakeup)。务必在 while 循环中检查条件,不要用 if。
并行 GC
分代 GC 与 Domain
Domain 0 Domain 1 Domain 2
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Minor GC │ │ Minor GC │ │ Minor GC │
│ (独立) │ │ (独立) │ │ (独立) │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────┐
│ Major GC (协作式) │
│ 所有 domain 协助标记和清理 │
└──────────────────────────────────────────────────┘
Minor GC:每个 domain 独立,不需要同步(无竞争)。
Major GC:协作式,所有 domain 参与标记和清理。使用 steal-based work-stealing 提高效率。
GC 调优
(* 为每个 domain 设置 GC 参数 *)
let setup_gc_for_domain () =
Gc.set { (Gc.get ()) with
minor_heap_size = 2 * 1024 * 1024; (* 2M words *)
space_overhead = 200; (* 允许更多空间 *)
}
(* 创建带自定义 GC 的 domain *)
let d = Domain.spawn (fun () ->
setup_gc_for_domain ();
compute_heavy_task ()
)
并行性能分析
性能测量
let measure f =
let start = Unix.gettimeofday () in
let result = f () in
let elapsed = Unix.gettimeofday () -. start in
(result, elapsed)
let sum_parallel arr n_domains =
let len = Array.length arr in
let chunk_size = len / n_domains in
let partials = Array.init n_domains (fun i ->
Domain.spawn (fun () ->
let start = i * chunk_size in
let end_ = if i = n_domains - 1 then len else start + chunk_size in
let sum = ref 0 in
for j = start to end_ - 1 do
sum := !sum + arr.(j)
done; !sum)
) in
Array.fold_left (fun acc d -> acc + Domain.join d) 0 partials
性能陷阱
| 陷阱 | 说明 | 解决方案 |
|---|---|---|
| 过多 domain | 创建/切换开销 | 通常 Domain.recommended_domain_count () |
| 共享可变状态 | 锁竞争 | 减少共享、使用 Atomic |
| 小粒度任务 | spawn/join 开销超过计算 | 增大任务粒度 |
| GC 压力 | minor GC 频繁 | 减少短生命周期分配 |
| False sharing | 缓存行竞争 | 使用 padding 隔离 |
并行策略
Fork-Join 模式
(* 简单 fork-join *)
let par_map f arr =
let n = Array.length arr in
let mid = n / 2 in
let left = Array.sub arr 0 mid in
let right = Array.sub arr mid (n - mid) in
let d1 = Domain.spawn (fun () -> Array.map f left) in
let r2 = Array.map f right in
Array.append (Domain.join d1) r2
数据并行
let parallel_for n_domains arr f =
let len = Array.length arr in
let chunk = len / n_domains in
Array.init n_domains (fun i ->
Domain.spawn (fun () ->
let lo = i * chunk in
let hi = if i = n_domains - 1 then len else lo + chunk in
for j = lo to hi - 1 do arr.(j) <- f arr.(j) done))
|> Array.iter Domain.join
Channel 通信
OCaml 5 标准库没有内置 Channel,但可以用 Mutex + Condition 实现:
(* 简单 channel 实现 *)
type 'a channel = { mutex: Mutex.t; cond: Condition.t; queue: 'a Queue.t }
let create_channel () =
{ mutex = Mutex.create (); cond = Condition.create (); queue = Queue.create () }
let send ch msg =
Mutex.protect ch.mutex (fun () ->
Queue.push msg ch.queue; Condition.signal ch.cond)
let receive ch =
Mutex.protect ch.mutex (fun () ->
while Queue.is_empty ch.queue do
Condition.wait ch.cond ch.mutex
done;
Queue.pop ch.queue)
对于更高级的并发原语,推荐使用 Eio 库:
opam install eio
open Eio.Std
let main env =
let pool = Eio.Stdenv.domain_mgr env in
Eio.Domain_manager.run pool (fun () ->
traceln "Running on domain %d" (Domain.self () :> int))
let () = Eio_main.run main
任务并行 vs 数据并行
| 特性 | 任务并行 | 数据并行 |
|---|---|---|
| 划分方式 | 不同任务分配给不同 domain | 相同任务处理不同数据块 |
| 适用场景 | 不同类型的计算 | 相同操作的批量处理 |
| 负载均衡 | 难(任务异构) | 容易(数据均匀划分) |
| 典型例子 | Web 服务器并发处理请求 | 图像处理、矩阵运算 |
| OCaml 实现 | Domain.spawn 不同函数 | Domain.spawn + 分块数组 |
混合策略
let process_pipeline data =
let filtered = par_filter is_valid data in
let d1 = Domain.spawn (fun () -> process_type_a filtered) in
let r2 = process_type_b filtered in
let result_a = Domain.join d1 in
merge result_a r2
与 Lwt/Async 集成
Domain 处理 CPU 密集型任务
Lwt/Async 是单线程协作式并发,CPU 密集型任务会阻塞事件循环。使用 Domain 来卸载:
open Lwt.Syntax
(* 将 CPU 密集型任务放到独立 domain *)
let offload_cpu f =
Lwt_preemptive.detach f ()
(* 或手动使用 Domain *)
let offload_to_domain f =
let promise, resolver = Lwt.wait () in
let _ = Domain.spawn (fun () ->
try
let result = f () in
Lwt.wakeup_later resolver result
with e ->
Lwt.wakeup_later_exn resolver e
) in
promise
(* 使用 *)
let process_image image_data =
let* result = offload_to_domain (fun () ->
heavy_computation image_data
) in
Lwt.return result
⚠️ 注意:Lwt.wakeup_later 从非 Lwt 线程调用是安全的。不要使用 Lwt.wakeup(可能在 Lwt 内部状态不一致时被调用)。
业务场景
场景:并行 Web 服务器
let run_server n_domains port =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock Unix.(ADDR_INET (inet_addr_any, port));
Unix.listen sock 128;
Array.init n_domains (fun _ ->
Domain.spawn (fun () ->
while true do
let client = Unix.accept sock in
serve_client client
done)
) |> Array.iter Domain.join