强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OCaml 教程 / 多核并行 Domains(OCaml 5)

多核并行 Domains(OCaml 5)

OCaml 5 引入了真正的多核并行支持。Domain 模块允许创建独立的执行域(domain),每个域运行在独立的操作系统线程上,拥有独立的 minor heap,共享 major heap。这是 OCaml 历史性的突破。


Domain 基础

概念模型

OCaml 5 运行时
├── Domain 0 (主域)
│   ├── Minor Heap (独立)
│   ├── 大部分 OCaml 运行时
│   └── OS Thread
├── Domain 1
│   ├── Minor Heap (独立)
│   └── OS Thread
└── Domain 2
    ├── Minor Heap (独立)
    └── OS Thread
共享:
    ├── Major Heap
    ├── GC (协作式)
    └── 全局数据结构

基本使用

(* 创建并启动一个新 domain *)
let () =
  let handler = Domain.spawn (fun () ->
    Printf.printf "Hello from domain %d!\n" (Domain.self () :> int);
    42
  ) in
  let result = Domain.join handler in
  Printf.printf "Result: %d\n" result

Domain 模块 API

函数类型说明
Domain.spawn('a -> 'b) -> 'b Domain.t创建新 domain 执行函数
Domain.join'a Domain.t -> 'a等待 domain 完成并获取结果
Domain.selfunit -> Domain.id获取当前 domain ID
Domain.cpu_relaxunit -> unitCPU 让步(自旋等待时使用)

⚠️ 注意Domain.spawn 会创建新的 OS 线程,有显著开销。不要为轻量任务频繁创建 domain。使用域池或任务调度器。


Atomic:原子操作

Atomic 提供无锁的原子读写操作,用于 domain 间共享可变状态:

(* 创建原子值 *)
let counter = Atomic.make 0

(* 原子操作 *)
let () =
  Atomic.incr counter;                    (* 原子递增 *)
  Atomic.decr counter;                    (* 原子递减 *)
  let old = Atomic.fetch_and_add counter 5 in  (* 原子加法,返回旧值 *)
  Printf.printf "Old: %d, New: %d\n" old (Atomic.get counter);
  Atomic.set counter 100;                 (* 原子设置 *)
  let v = Atomic.get counter in           (* 原子读取 *)
  Printf.printf "Current: %d\n" v

(* CAS(Compare-And-Swap)操作 *)
let rec increment counter =
  let old = Atomic.get counter in
  let new_val = old + 1 in
  if Atomic.compare_and_set counter old new_val then
    new_val  (* 成功 *)
  else
    increment counter  (* 失败,重试 *)

(* 带 CAS 的无锁栈 *)
type 'a stack = 'a list Atomic.t

let create_stack () : 'a stack = Atomic.make []

let push stack item =
  let rec loop () =
    let old = Atomic.get stack in
    if Atomic.compare_and_set stack old (item :: old) then ()
    else loop ()
  in
  loop ()

let pop stack =
  let rec loop () =
    match Atomic.get stack with
    | [] -> None
    | hd :: tl as old ->
      if Atomic.compare_and_set stack old tl then Some hd
      else loop ()
  in
  loop ()

Atomic 操作速查表

操作说明
Atomic.make v创建原子值
Atomic.get a原子读取
Atomic.set a v原子写入
Atomic.exchange a v原子交换,返回旧值
Atomic.compare_and_set a old newCAS 操作
Atomic.fetch_and_add a n原子加法,返回旧值
Atomic.incr a原子递增
Atomic.decr a原子递减

⚠️ 注意compare_and_set 只能保护单个值的原子性。如果需要保护多个相关值的原子修改,使用 Mutex


Mutex:互斥锁

Mutex 保护临界区,确保同一时刻只有一个 domain 执行临界代码:

(* 使用 Mutex 保护共享数据 *)
let shared_counter = ref 0
let mutex = Mutex.create ()

let increment () =
  Mutex.lock mutex;
  shared_counter := !shared_counter + 1;
  Mutex.unlock mutex

let get_counter () =
  Mutex.lock mutex;
  let v = !shared_counter in
  Mutex.unlock mutex;
  v

(* 更安全的使用方式:with_lock *)
let safe_increment () =
  Mutex.protect mutex (fun () ->
    shared_counter := !shared_counter + 1
  )

(* 并行递增示例 *)
let () =
  let domains = Array.init 4 (fun _ ->
    Domain.spawn (fun () ->
      for _ = 1 to 10000 do
        safe_increment ()
      done
    )
  ) in
  Array.iter Domain.join domains;
  Printf.printf "Final count: %d\n" (get_counter ())
  (* 输出: Final count: 40000 *)

Mutex API

函数说明
Mutex.create创建互斥锁
Mutex.lock获取锁(阻塞)
Mutex.unlock释放锁
Mutex.protect m f获取锁 → 执行 f → 释放锁(异常安全)
Mutex.try_lock尝试获取锁(非阻塞)

💡 提示:总是使用 Mutex.protect 而非手动 lock/unlock,确保异常安全。


Condition:条件变量

条件变量用于 domain 间的等待/通知机制:

(* 生产者-消费者队列 *)
type 'a bounded_queue = {
  mutex : Mutex.t;
  not_empty : Condition.t;
  not_full : Condition.t;
  queue : 'a Queue.t;
  capacity : int;
}

let create_queue capacity = {
  mutex = Mutex.create ();
  not_empty = Condition.create ();
  not_full = Condition.create ();
  queue = Queue.create ();
  capacity;
}

let put q item =
  Mutex.protect q.mutex (fun () ->
    while Queue.length q.queue >= q.capacity do
      Condition.wait q.not_full q.mutex
    done;
    Queue.push item q.queue;
    Condition.signal q.not_empty
  )

let take q =
  Mutex.protect q.mutex (fun () ->
    while Queue.is_empty q.queue do
      Condition.wait q.not_empty q.mutex
    done;
    let item = Queue.pop q.queue in
    Condition.signal q.not_full;
    item
  )

(* 使用示例 *)
let () =
  let q = create_queue 10 in
  let producer = Domain.spawn (fun () ->
    for i = 1 to 20 do
      put q i;
      Printf.printf "Produced: %d\n" i
    done
  ) in
  let consumer = Domain.spawn (fun () ->
    for _ = 1 to 20 do
      let v = take q in
      Printf.printf "Consumed: %d\n" v
    done
  ) in
  Domain.join producer;
  Domain.join consumer

Condition API

函数说明
Condition.create创建条件变量
Condition.wait c m释放锁 m,等待信号,重新获取锁
Condition.signal c唤醒一个等待者
Condition.broadcast c唤醒所有等待者

⚠️ 注意Condition.wait 可能被虚假唤醒(spurious wakeup)。务必在 while 循环中检查条件,不要用 if


并行 GC

分代 GC 与 Domain

Domain 0              Domain 1              Domain 2
┌───────────┐        ┌───────────┐        ┌───────────┐
│ Minor GC  │        │ Minor GC  │        │ Minor GC  │
│ (独立)    │        │ (独立)    │        │ (独立)    │
└─────┬─────┘        └─────┬─────┘        └─────┬─────┘
      │                    │                    │
      ▼                    ▼                    ▼
┌──────────────────────────────────────────────────┐
│              Major GC (协作式)                     │
│   所有 domain 协助标记和清理                       │
└──────────────────────────────────────────────────┘

Minor GC:每个 domain 独立,不需要同步(无竞争)。

Major GC:协作式,所有 domain 参与标记和清理。使用 steal-based work-stealing 提高效率。

GC 调优

(* 为每个 domain 设置 GC 参数 *)
let setup_gc_for_domain () =
  Gc.set { (Gc.get ()) with
    minor_heap_size = 2 * 1024 * 1024;  (* 2M words *)
    space_overhead = 200;                 (* 允许更多空间 *)
  }

(* 创建带自定义 GC 的 domain *)
let d = Domain.spawn (fun () ->
  setup_gc_for_domain ();
  compute_heavy_task ()
)

并行性能分析

性能测量

let measure f =
  let start = Unix.gettimeofday () in
  let result = f () in
  let elapsed = Unix.gettimeofday () -. start in
  (result, elapsed)

let sum_parallel arr n_domains =
  let len = Array.length arr in
  let chunk_size = len / n_domains in
  let partials = Array.init n_domains (fun i ->
    Domain.spawn (fun () ->
      let start = i * chunk_size in
      let end_ = if i = n_domains - 1 then len else start + chunk_size in
      let sum = ref 0 in
      for j = start to end_ - 1 do
        sum := !sum + arr.(j)
      done; !sum)
  ) in
  Array.fold_left (fun acc d -> acc + Domain.join d) 0 partials

性能陷阱

陷阱说明解决方案
过多 domain创建/切换开销通常 Domain.recommended_domain_count ()
共享可变状态锁竞争减少共享、使用 Atomic
小粒度任务spawn/join 开销超过计算增大任务粒度
GC 压力minor GC 频繁减少短生命周期分配
False sharing缓存行竞争使用 padding 隔离

并行策略

Fork-Join 模式

(* 简单 fork-join *)
let par_map f arr =
  let n = Array.length arr in
  let mid = n / 2 in
  let left = Array.sub arr 0 mid in
  let right = Array.sub arr mid (n - mid) in
  let d1 = Domain.spawn (fun () -> Array.map f left) in
  let r2 = Array.map f right in
  Array.append (Domain.join d1) r2

数据并行

let parallel_for n_domains arr f =
  let len = Array.length arr in
  let chunk = len / n_domains in
  Array.init n_domains (fun i ->
    Domain.spawn (fun () ->
      let lo = i * chunk in
      let hi = if i = n_domains - 1 then len else lo + chunk in
      for j = lo to hi - 1 do arr.(j) <- f arr.(j) done))
  |> Array.iter Domain.join

Channel 通信

OCaml 5 标准库没有内置 Channel,但可以用 Mutex + Condition 实现:

(* 简单 channel 实现 *)
type 'a channel = { mutex: Mutex.t; cond: Condition.t; queue: 'a Queue.t }

let create_channel () =
  { mutex = Mutex.create (); cond = Condition.create (); queue = Queue.create () }

let send ch msg =
  Mutex.protect ch.mutex (fun () ->
    Queue.push msg ch.queue; Condition.signal ch.cond)

let receive ch =
  Mutex.protect ch.mutex (fun () ->
    while Queue.is_empty ch.queue do
      Condition.wait ch.cond ch.mutex
    done;
    Queue.pop ch.queue)

对于更高级的并发原语,推荐使用 Eio 库:

opam install eio
open Eio.Std

let main env =
  let pool = Eio.Stdenv.domain_mgr env in
  Eio.Domain_manager.run pool (fun () ->
    traceln "Running on domain %d" (Domain.self () :> int))

let () = Eio_main.run main

任务并行 vs 数据并行

特性任务并行数据并行
划分方式不同任务分配给不同 domain相同任务处理不同数据块
适用场景不同类型的计算相同操作的批量处理
负载均衡难(任务异构)容易(数据均匀划分)
典型例子Web 服务器并发处理请求图像处理、矩阵运算
OCaml 实现Domain.spawn 不同函数Domain.spawn + 分块数组

混合策略

let process_pipeline data =
  let filtered = par_filter is_valid data in
  let d1 = Domain.spawn (fun () -> process_type_a filtered) in
  let r2 = process_type_b filtered in
  let result_a = Domain.join d1 in
  merge result_a r2

与 Lwt/Async 集成

Domain 处理 CPU 密集型任务

Lwt/Async 是单线程协作式并发,CPU 密集型任务会阻塞事件循环。使用 Domain 来卸载:

open Lwt.Syntax

(* 将 CPU 密集型任务放到独立 domain *)
let offload_cpu f =
  Lwt_preemptive.detach f ()

(* 或手动使用 Domain *)
let offload_to_domain f =
  let promise, resolver = Lwt.wait () in
  let _ = Domain.spawn (fun () ->
    try
      let result = f () in
      Lwt.wakeup_later resolver result
    with e ->
      Lwt.wakeup_later_exn resolver e
  ) in
  promise

(* 使用 *)
let process_image image_data =
  let* result = offload_to_domain (fun () ->
    heavy_computation image_data
  ) in
  Lwt.return result

⚠️ 注意Lwt.wakeup_later 从非 Lwt 线程调用是安全的。不要使用 Lwt.wakeup(可能在 Lwt 内部状态不一致时被调用)。


业务场景

场景:并行 Web 服务器

let run_server n_domains port =
  let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
  Unix.setsockopt sock Unix.SO_REUSEADDR true;
  Unix.bind sock Unix.(ADDR_INET (inet_addr_any, port));
  Unix.listen sock 128;
  Array.init n_domains (fun _ ->
    Domain.spawn (fun () ->
      while true do
        let client = Unix.accept sock in
        serve_client client
      done)
  ) |> Array.iter Domain.join

扩展阅读