强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OCaml 教程 / 并发编程 Lwt/Async

并发编程 Lwt/Async

OCaml 的并发编程主要有两大方案:Lwt(Lightweight threads)和 Async(Jane Street 出品)。两者都基于协程(cooperative concurrency)模型,而非抢占式线程。


协程概念

协程是一种用户态的协作式并发原语:

特性协程(Lwt/Async)OS 线程OCaml 5 Domains
调度方式协作式(主动让出)抢占式抢占式
切换成本极低(函数调用)高(上下文切换)中等
并行能力单核并发多核并行多核并行
数据竞争无(单线程)可能可能
适用场景IO 密集CPU 密集(有竞争)CPU 密集
协程执行模型:

Lwt/Async 调度器(事件循环)
    │
    ├── Promise A [已完成] → 执行回调
    ├── Promise B [等待IO] → 跳过
    ├── Promise C [已完成] → 执行回调
    └── Promise D [等待IO] → 跳过
    │
    └── epoll/kqueue/select 等待 IO 事件

Lwt 基础

安装

opam install lwt lwt_ppx lwt.unix

Promise 与 bind

Lwt 的核心类型是 'a Lwt.t(promise),代表一个未来会产生值的计算:

open Lwt.Syntax

(* 创建已完成的 promise *)
let p : int Lwt.t = Lwt.return 42

(* 创建一个延迟 promise *)
let delayed : int Lwt.t =
  let* () = Lwt_unix.sleep 1.0 in
  Lwt.return 42

(* bind:promise 完成后执行下一步 *)
let pipeline : string Lwt.t =
  let* n = Lwt.return 42 in
  let* s = Lwt.return (string_of_int n) in
  Lwt.return ("Result: " ^ s)

(* 运行 promise *)
let () =
  let result = Lwt_main.run pipeline in
  Printf.printf "%s\n" result

核心 API

(* 纯值 → 已完成的 promise *)
val return : 'a -> 'a Lwt.t

(* bind:链式操作 *)
val bind : 'a Lwt.t -> ('a -> 'b Lwt.t) -> 'b Lwt.t

(* map:不改变 promise 链 *)
val map : ('a -> 'b) -> 'a Lwt.t -> 'b Lwt.t

(* catch:错误处理 *)
val catch : (unit -> 'a Lwt.t) -> (exn -> 'a Lwt.t) -> 'a Lwt.t

(* choose:竞争选择(第一个完成的) *)
val choose : 'a Lwt.t list -> 'a Lwt.t

(* join:等待所有完成 *)
val join : unit Lwt.t list -> unit Lwt.t

(* 并行执行 *)
val both : 'a Lwt.t -> 'b Lwt.t -> ('a * 'b) Lwt.t

Lwt 前缀语法(ppx)

(* let* 绑定语法 *)
let computation =
  let* x = fetch_data () in
  let* y = process x in
  Lwt.return (transform y)

(* match* 模式匹配 *)
let handle result =
  match* result with
  | Ok value -> Lwt.return value
  | Error msg -> Lwt.fail_with msg

Lwt_io:并发 IO

open Lwt.Syntax

(* 读取文件 *)
let read_file path =
  let* ic = Lwt_io.open_file ~mode:Lwt_io.Input path in
  Lwt.finalize
    (fun () -> Lwt_io.read ic)
    (fun () -> Lwt_io.close ic)

(* 写入文件 *)
let write_file path content =
  let* oc = Lwt_io.open_file ~mode:Lwt_io.Output path in
  Lwt.finalize
    (fun () -> Lwt_io.write oc content)
    (fun () -> Lwt_io.close oc)

(* 标准输入输出 *)
let echo_server () =
  let* line = Lwt_io.read_line Lwt_io.stdin in
  Lwt_io.printl ("You said: " ^ line)

(* TCP 服务器 *)
let handle_client (ic, oc) =
  let* line = Lwt_io.read_line ic in
  let* () = Lwt_io.write_line oc ("Echo: " ^ line) in
  Lwt_io.close ic

let run_server () =
  let addr = Unix.(ADDR_INET (inet_addr_loopback, 8080)) in
  let* server = Lwt_io.establish_server_with_client_address
    addr handle_client in
  Lwt_io.printl "Server running on port 8080"

Lwt_list:并发列表操作

open Lwt.Syntax

(* 顺序遍历(一个接一个) *)
let process_sequential items =
  Lwt_list.iter_s (fun item ->
    let* result = do_something item in
    Lwt_io.printf "Processed: %s\n" result
  ) items

(* 并发遍历(全部同时开始) *)
let process_concurrent items =
  Lwt_list.iter_p (fun item ->
    let* result = do_something item in
    Lwt_io.printf "Processed: %s\n" result
  ) items

(* 并发 map *)
let fetch_all urls =
  Lwt_list.map_p (fun url ->
    http_get url
  ) urls

(* 限制并发度 *)
let fetch_limited n urls =
  let pool = Lwt_pool.create n (fun () -> Lwt.return_unit) in
  Lwt_list.map_p (fun url ->
    Lwt_pool.use pool (fun () -> http_get url)
  ) urls

⚠️ 注意Lwt_list.map_p 会同时启动所有任务。如果列表很大,可能导致文件描述符耗尽。使用 Lwt_pool 限制并发度。


Async 库

Async 是 Jane Street 的并发库,与 Core 集成紧密。

安装

opam install async core_unix

基本使用

open Core
open Async

(* Deferred 类型对应 Lwt 的 t *)
let computation : string Deferred.t =
  let%bind x = return 42 in
  let%bind y = return (string_of_int x) in
  return ("Result: " ^ y)

(* 运行 Async 程序 *)
let () =
  don't_wait_for (computation >>| fun s -> print_endline s);
  Scheduler.go ()

Reader/Writer(IO)

open Core
open Async

let read_file path =
  Reader.with_file path ~f:(fun r ->
    Reader.contents r
  )

let write_file path content =
  Writer.with_file path ~f:(fun w ->
    Writer.write w content;
    return ()
  )

(* TCP *)
let run_server () =
  let%bind server = Tcp.Server.create
    ~on_handler_error:`Raise
    (Tcp.Where_to_listen.of_port 8080)
    (fun _addr r w ->
      let%bind line = Reader.read_line r in
      (match line with
       | `Ok l -> Writer.write_line w ("Echo: " ^ l)
       | `Eof -> ());
      Writer.close w
    )
  in
  Tcp.Server.close_finished server

Lwt vs Async 对比

特性LwtAsync
维护者社区(ocsigen)Jane Street
核心类型'a Lwt.t'a Deferred.t
依赖轻量,独立依赖 Core
错误处理Lwt.catchMonitor.try_with
取消Lwt.cancelIvar.fill
与 OCaml 5支持实验中
学习曲线较平缓较陡
生态广泛(cohttp, ocaml-tls)Jane Street 内部

选择建议

场景推荐
新项目,无历史依赖Lwt(更轻量)
已使用 Core/BaseAsync(集成更好)
OCaml 5 多核Lwt(生态更成熟)
性能极致两者差距不大,取决于 benchmark

并发模式

Pipeline 模式

(* 生产者-消费者管道 *)
let pipeline () =
  let (read_ch, write_ch) = Lwt_io.pipe () in
  let producer =
    Lwt_list.iter_s (fun i ->
      Lwt_io.write_value write_ch i
    ) [1; 2; 3; 4; 5]
  in
  let consumer =
    let rec loop () =
      match%lwt Lwt_io.read_value read_ch with
      | i ->
        Lwt_io.printf "Got: %d\n" i >>= loop
      | exception End_of_file -> Lwt.return_unit
    in
    loop ()
  in
  Lwt.join [producer; consumer]

Pool 模式

(* 限制并发的 worker 池 *)
let create_pool n worker_fn =
  let pool = Lwt_pool.create n (fun () -> Lwt.return_unit) in
  fun task ->
    Lwt_pool.use pool (fun () -> worker_fn task)

let process_with_pool tasks =
  let pool = create_pool 4 (fun task ->
    (* 执行 IO 操作 *)
    do_io_task task
  ) in
  Lwt_list.iter_p pool tasks

Fanout 模式

(* 并发请求多个服务,收集结果 *)
let fanout () =
  let (results, push) = Lwt_stream.create () in
  let tasks = [
    (fun () -> fetch_service_a () >|= fun r -> push (Some ("A", r)));
    (fun () -> fetch_service_b () >|= fun r -> push (Some ("B", r)));
    (fun () -> fetch_service_c () >|= fun r -> push (Some ("C", r)));
  ] in
  let* () = Lwt.join (List.map (fun f -> f ()) tasks) in
  push None;  (* 关闭流 *)
  Lwt_stream.to_list results

HTTP 客户端实战

使用 Cohttp + Lwt

open Lwt.Syntax
open Cohttp_lwt_unix

let http_get url =
  let* (_resp, body) = Client.get (Uri.of_string url) in
  Cohttp_lwt.Body.to_string body

let http_post url data =
  let headers = Cohttp.Header.of_list [
    ("Content-Type", "application/json")
  ] in
  let body = Cohttp_lwt.Body.of_string data in
  let* (_resp, resp_body) = Client.post
    ~headers ~body (Uri.of_string url) in
  Cohttp_lwt.Body.to_string resp_body

(* 批量请求 *)
let fetch_all urls =
  Lwt_list.map_p http_get urls

let () =
  let urls = [
    "https://api.example.com/users";
    "https://api.example.com/posts";
    "https://api.example.com/comments";
  ] in
  let bodies = Lwt_main.run (fetch_all urls) in
  List.iteri (fun i body ->
    Printf.printf "Response %d: %d bytes\n" i (String.length body)
  ) bodies

错误处理

Lwt 错误处理

open Lwt.Syntax

(* try/catch 风格 *)
let safe_fetch url =
  Lwt.catch
    (fun () ->
      let* body = http_get url in
      Lwt.return (Ok body))
    (fun exn ->
      Lwt.return (Error (Printexc.to_string exn)))

(* Result monad 风格 *)
let chain_with_errors () =
  let open Lwt_result.Syntax in
  let* data = fetch_data () in
  let* processed = process data in
  Lwt_result.return (transform processed)

(* 超时处理 *)
let with_timeout seconds promise =
  let* result =
    Lwt.choose [
      (promise >|= fun x -> Some x);
      (Lwt_unix.sleep seconds >|= fun () -> None);
    ]
  in
  match result with
  | Some x -> Lwt.return x
  | None -> Lwt.fail_with "Timeout"

Async 错误处理

open Core
open Async

let safe_fetch url =
  Monitor.try_with (fun () ->
    http_get url
  ) >>| function
  | Ok body -> Ok body
  | Error exn -> Error (Exn.to_string exn)

let with_timeout span deferred =
  choose [
    choice deferred (fun x -> `Result x);
    choice (after span) (fun () -> `Timeout);
  ] >>| function
  | `Result x -> x
  | `Timeout -> failwith "Timeout"

性能分析

Lwt 性能技巧

  1. 避免不必要的 bind
(* ❌ 每次都创建 promise *)
let rec sum = function
  | [] -> Lwt.return 0
  | x :: xs ->
    let* rest = sum xs in
    Lwt.return (x + rest)

(* ✅ 纯计算不需要 Lwt *)
let sum xs = Lwt.return (List.fold_left (+) 0 xs)
  1. 使用 Lwt_stream 处理大量数据
(* 流式处理,避免全部加载到内存 *)
let process_large_file path =
  let stream = Lwt_io.lines_of_file path in
  Lwt_stream.iter_s (fun line ->
    process_line line
  ) stream
  1. 监控 Lwt promise 泄漏
(* 开启 Lwt 调试模式 *)
let () = Lwt_log.add_rule "*" Lwt_log.Debug

业务场景


扩展阅读