OCaml 教程 / 并发编程 Lwt/Async
并发编程 Lwt/Async
OCaml 的并发编程主要有两大方案:Lwt(Lightweight threads)和 Async(Jane Street 出品)。两者都基于协程(cooperative concurrency)模型,而非抢占式线程。
协程概念
协程是一种用户态的协作式并发原语:
| 特性 | 协程(Lwt/Async) | OS 线程 | OCaml 5 Domains |
|---|---|---|---|
| 调度方式 | 协作式(主动让出) | 抢占式 | 抢占式 |
| 切换成本 | 极低(函数调用) | 高(上下文切换) | 中等 |
| 并行能力 | 单核并发 | 多核并行 | 多核并行 |
| 数据竞争 | 无(单线程) | 可能 | 可能 |
| 适用场景 | IO 密集 | CPU 密集(有竞争) | CPU 密集 |
协程执行模型:
Lwt/Async 调度器(事件循环)
│
├── Promise A [已完成] → 执行回调
├── Promise B [等待IO] → 跳过
├── Promise C [已完成] → 执行回调
└── Promise D [等待IO] → 跳过
│
└── epoll/kqueue/select 等待 IO 事件
Lwt 基础
安装
opam install lwt lwt_ppx lwt.unix
Promise 与 bind
Lwt 的核心类型是 'a Lwt.t(promise),代表一个未来会产生值的计算:
open Lwt.Syntax
(* 创建已完成的 promise *)
let p : int Lwt.t = Lwt.return 42
(* 创建一个延迟 promise *)
let delayed : int Lwt.t =
let* () = Lwt_unix.sleep 1.0 in
Lwt.return 42
(* bind:promise 完成后执行下一步 *)
let pipeline : string Lwt.t =
let* n = Lwt.return 42 in
let* s = Lwt.return (string_of_int n) in
Lwt.return ("Result: " ^ s)
(* 运行 promise *)
let () =
let result = Lwt_main.run pipeline in
Printf.printf "%s\n" result
核心 API
(* 纯值 → 已完成的 promise *)
val return : 'a -> 'a Lwt.t
(* bind:链式操作 *)
val bind : 'a Lwt.t -> ('a -> 'b Lwt.t) -> 'b Lwt.t
(* map:不改变 promise 链 *)
val map : ('a -> 'b) -> 'a Lwt.t -> 'b Lwt.t
(* catch:错误处理 *)
val catch : (unit -> 'a Lwt.t) -> (exn -> 'a Lwt.t) -> 'a Lwt.t
(* choose:竞争选择(第一个完成的) *)
val choose : 'a Lwt.t list -> 'a Lwt.t
(* join:等待所有完成 *)
val join : unit Lwt.t list -> unit Lwt.t
(* 并行执行 *)
val both : 'a Lwt.t -> 'b Lwt.t -> ('a * 'b) Lwt.t
Lwt 前缀语法(ppx)
(* let* 绑定语法 *)
let computation =
let* x = fetch_data () in
let* y = process x in
Lwt.return (transform y)
(* match* 模式匹配 *)
let handle result =
match* result with
| Ok value -> Lwt.return value
| Error msg -> Lwt.fail_with msg
Lwt_io:并发 IO
open Lwt.Syntax
(* 读取文件 *)
let read_file path =
let* ic = Lwt_io.open_file ~mode:Lwt_io.Input path in
Lwt.finalize
(fun () -> Lwt_io.read ic)
(fun () -> Lwt_io.close ic)
(* 写入文件 *)
let write_file path content =
let* oc = Lwt_io.open_file ~mode:Lwt_io.Output path in
Lwt.finalize
(fun () -> Lwt_io.write oc content)
(fun () -> Lwt_io.close oc)
(* 标准输入输出 *)
let echo_server () =
let* line = Lwt_io.read_line Lwt_io.stdin in
Lwt_io.printl ("You said: " ^ line)
(* TCP 服务器 *)
let handle_client (ic, oc) =
let* line = Lwt_io.read_line ic in
let* () = Lwt_io.write_line oc ("Echo: " ^ line) in
Lwt_io.close ic
let run_server () =
let addr = Unix.(ADDR_INET (inet_addr_loopback, 8080)) in
let* server = Lwt_io.establish_server_with_client_address
addr handle_client in
Lwt_io.printl "Server running on port 8080"
Lwt_list:并发列表操作
open Lwt.Syntax
(* 顺序遍历(一个接一个) *)
let process_sequential items =
Lwt_list.iter_s (fun item ->
let* result = do_something item in
Lwt_io.printf "Processed: %s\n" result
) items
(* 并发遍历(全部同时开始) *)
let process_concurrent items =
Lwt_list.iter_p (fun item ->
let* result = do_something item in
Lwt_io.printf "Processed: %s\n" result
) items
(* 并发 map *)
let fetch_all urls =
Lwt_list.map_p (fun url ->
http_get url
) urls
(* 限制并发度 *)
let fetch_limited n urls =
let pool = Lwt_pool.create n (fun () -> Lwt.return_unit) in
Lwt_list.map_p (fun url ->
Lwt_pool.use pool (fun () -> http_get url)
) urls
⚠️ 注意:Lwt_list.map_p 会同时启动所有任务。如果列表很大,可能导致文件描述符耗尽。使用 Lwt_pool 限制并发度。
Async 库
Async 是 Jane Street 的并发库,与 Core 集成紧密。
安装
opam install async core_unix
基本使用
open Core
open Async
(* Deferred 类型对应 Lwt 的 t *)
let computation : string Deferred.t =
let%bind x = return 42 in
let%bind y = return (string_of_int x) in
return ("Result: " ^ y)
(* 运行 Async 程序 *)
let () =
don't_wait_for (computation >>| fun s -> print_endline s);
Scheduler.go ()
Reader/Writer(IO)
open Core
open Async
let read_file path =
Reader.with_file path ~f:(fun r ->
Reader.contents r
)
let write_file path content =
Writer.with_file path ~f:(fun w ->
Writer.write w content;
return ()
)
(* TCP *)
let run_server () =
let%bind server = Tcp.Server.create
~on_handler_error:`Raise
(Tcp.Where_to_listen.of_port 8080)
(fun _addr r w ->
let%bind line = Reader.read_line r in
(match line with
| `Ok l -> Writer.write_line w ("Echo: " ^ l)
| `Eof -> ());
Writer.close w
)
in
Tcp.Server.close_finished server
Lwt vs Async 对比
| 特性 | Lwt | Async |
|---|---|---|
| 维护者 | 社区(ocsigen) | Jane Street |
| 核心类型 | 'a Lwt.t | 'a Deferred.t |
| 依赖 | 轻量,独立 | 依赖 Core |
| 错误处理 | Lwt.catch | Monitor.try_with |
| 取消 | Lwt.cancel | Ivar.fill |
| 与 OCaml 5 | 支持 | 实验中 |
| 学习曲线 | 较平缓 | 较陡 |
| 生态 | 广泛(cohttp, ocaml-tls) | Jane Street 内部 |
选择建议
| 场景 | 推荐 |
|---|---|
| 新项目,无历史依赖 | Lwt(更轻量) |
| 已使用 Core/Base | Async(集成更好) |
| OCaml 5 多核 | Lwt(生态更成熟) |
| 性能极致 | 两者差距不大,取决于 benchmark |
并发模式
Pipeline 模式
(* 生产者-消费者管道 *)
let pipeline () =
let (read_ch, write_ch) = Lwt_io.pipe () in
let producer =
Lwt_list.iter_s (fun i ->
Lwt_io.write_value write_ch i
) [1; 2; 3; 4; 5]
in
let consumer =
let rec loop () =
match%lwt Lwt_io.read_value read_ch with
| i ->
Lwt_io.printf "Got: %d\n" i >>= loop
| exception End_of_file -> Lwt.return_unit
in
loop ()
in
Lwt.join [producer; consumer]
Pool 模式
(* 限制并发的 worker 池 *)
let create_pool n worker_fn =
let pool = Lwt_pool.create n (fun () -> Lwt.return_unit) in
fun task ->
Lwt_pool.use pool (fun () -> worker_fn task)
let process_with_pool tasks =
let pool = create_pool 4 (fun task ->
(* 执行 IO 操作 *)
do_io_task task
) in
Lwt_list.iter_p pool tasks
Fanout 模式
(* 并发请求多个服务,收集结果 *)
let fanout () =
let (results, push) = Lwt_stream.create () in
let tasks = [
(fun () -> fetch_service_a () >|= fun r -> push (Some ("A", r)));
(fun () -> fetch_service_b () >|= fun r -> push (Some ("B", r)));
(fun () -> fetch_service_c () >|= fun r -> push (Some ("C", r)));
] in
let* () = Lwt.join (List.map (fun f -> f ()) tasks) in
push None; (* 关闭流 *)
Lwt_stream.to_list results
HTTP 客户端实战
使用 Cohttp + Lwt
open Lwt.Syntax
open Cohttp_lwt_unix
let http_get url =
let* (_resp, body) = Client.get (Uri.of_string url) in
Cohttp_lwt.Body.to_string body
let http_post url data =
let headers = Cohttp.Header.of_list [
("Content-Type", "application/json")
] in
let body = Cohttp_lwt.Body.of_string data in
let* (_resp, resp_body) = Client.post
~headers ~body (Uri.of_string url) in
Cohttp_lwt.Body.to_string resp_body
(* 批量请求 *)
let fetch_all urls =
Lwt_list.map_p http_get urls
let () =
let urls = [
"https://api.example.com/users";
"https://api.example.com/posts";
"https://api.example.com/comments";
] in
let bodies = Lwt_main.run (fetch_all urls) in
List.iteri (fun i body ->
Printf.printf "Response %d: %d bytes\n" i (String.length body)
) bodies
错误处理
Lwt 错误处理
open Lwt.Syntax
(* try/catch 风格 *)
let safe_fetch url =
Lwt.catch
(fun () ->
let* body = http_get url in
Lwt.return (Ok body))
(fun exn ->
Lwt.return (Error (Printexc.to_string exn)))
(* Result monad 风格 *)
let chain_with_errors () =
let open Lwt_result.Syntax in
let* data = fetch_data () in
let* processed = process data in
Lwt_result.return (transform processed)
(* 超时处理 *)
let with_timeout seconds promise =
let* result =
Lwt.choose [
(promise >|= fun x -> Some x);
(Lwt_unix.sleep seconds >|= fun () -> None);
]
in
match result with
| Some x -> Lwt.return x
| None -> Lwt.fail_with "Timeout"
Async 错误处理
open Core
open Async
let safe_fetch url =
Monitor.try_with (fun () ->
http_get url
) >>| function
| Ok body -> Ok body
| Error exn -> Error (Exn.to_string exn)
let with_timeout span deferred =
choose [
choice deferred (fun x -> `Result x);
choice (after span) (fun () -> `Timeout);
] >>| function
| `Result x -> x
| `Timeout -> failwith "Timeout"
性能分析
Lwt 性能技巧
- 避免不必要的 bind:
(* ❌ 每次都创建 promise *)
let rec sum = function
| [] -> Lwt.return 0
| x :: xs ->
let* rest = sum xs in
Lwt.return (x + rest)
(* ✅ 纯计算不需要 Lwt *)
let sum xs = Lwt.return (List.fold_left (+) 0 xs)
- 使用
Lwt_stream处理大量数据:
(* 流式处理,避免全部加载到内存 *)
let process_large_file path =
let stream = Lwt_io.lines_of_file path in
Lwt_stream.iter_s (fun line ->
process_line line
) stream
- 监控 Lwt promise 泄漏:
(* 开启 Lwt 调试模式 *)
let () = Lwt_log.add_rule "*" Lwt_log.Debug