OCaml 教程 / 分布式系统基础
分布式系统基础
本节介绍如何使用 OCaml 构建分布式系统,涵盖 RPC、消息队列、Actor 模型等核心概念。
RPC 框架 (Rpc_async)
opam install async_rpc
(* 定义 RPC 协议 *)
open Core
open Async
module Protocol = struct
type query = {
name: string;
age: int;
} [@@deriving bin_io, sexp]
type response = {
greeting: string;
processed_at: string;
} [@@deriving bin_io, sexp]
(* RPC 定义 *)
let rpc =
Rpc.Rpc.create
~name:"greet"
~version:1
~bin_query
~bin_response
end
(* 服务端 *)
let server_impl =
Rpc.Rpc.implement Protocol.rpc (fun () query ->
let greeting = Printf.sprintf "Hello, %s! You are %d years old." query.name query.age in
let processed_at = Time.to_string_utc (Time.now ()) in
return { Protocol.greeting; processed_at }
)
let start_server () =
let implementations = Rpc.Implementations.create_exn
~implementations:[server_impl]
~on_unknown_rpc:`Raise
in
let%bind server = Tcp.Server.create
~on_handler_error:`Raise
(Tcp.Where_to_listen.of_port 8080)
(fun _addr reader writer ->
Rpc.Connection.server_with_close reader writer
~implementations
~connection_state:(fun _conn -> ())
~on_handshake_error:`Raise)
in
Tcp.Server.close_finished server
(* 客户端 *)
let call_greet host port name age =
let%bind conn = Rpc.Connection.client (Tcp.Where_to_connect.of_host_and_port { host; port }) in
match conn with
| Ok connection ->
let query = { Protocol.name; age } in
let%bind response = Rpc.Rpc.dispatch Protocol.rpc connection query in
(match response with
| Ok resp ->
printf "响应: %s\n" resp.greeting;
printf "处理时间: %s\n" resp.processed_at;
Rpc.Connection.close connection
| Error err ->
printf "错误: %s\n" (Error.to_string_hum err);
Rpc.Connection.close connection)
| Error err ->
printf "连接失败: %s\n" (Error.to_string_hum err);
return ()
| RPC 组件 | 说明 |
|---|---|
Rpc.Rpc.create | 定义 RPC 端点 |
bin_query/bin_response | 序列化格式 |
Rpc.Rpc.implement | 实现服务端 |
Rpc.Rpc.dispatch | 调用客户端 |
消息队列集成
(* Redis 消息队列 *)
open Lwt.Infix
(* 使用 Redis 发布/订阅 *)
let publish_message channel message =
let%lwt conn = Redis_lwt.Client.connect { host = "localhost"; port = 6379 } in
let%lwt _ = Redis_lwt.Client.publish conn channel message in
Redis_lwt.Client.disconnect conn
let subscribe_channel channel handler =
let%lwt conn = Redis_lwt.Client.connect { host = "localhost"; port = 6379 } in
let%lwt sub = Redis_lwt.Client.subscribe conn [channel] in
let rec loop () =
let%lwt msg = Redis_lwt.Client.receive sub in
handler msg;
loop ()
in
loop ()
(* 简单消息队列 *)
module MessageQueue = struct
type 'a t = {
queue: 'a Queue.t;
mutex: Mutex.t;
condition: Condition.t;
}
let create () = {
queue = Queue.create ();
mutex = Mutex.create ();
condition = Condition.create ();
}
let push q msg =
Mutex.lock q.mutex;
Queue.push msg q.queue;
Condition.signal q.condition;
Mutex.unlock q.mutex
let pop q =
Mutex.lock q.mutex;
while Queue.is_empty q.queue do
Condition.wait q.condition q.mutex
done;
let msg = Queue.pop q.queue in
Mutex.unlock q.mutex;
msg
let try_pop q =
Mutex.lock q.mutex;
let result =
if Queue.is_empty q.queue then None
else Some (Queue.pop q.queue)
in
Mutex.unlock q.mutex;
result
end
(* 使用 *)
let () =
let mq = MessageQueue.create () in
(* 生产者 *)
let producer = Thread.create (fun () ->
for i = 1 to 100 do
MessageQueue.push mq (Printf.sprintf "消息 %d" i);
Thread.delay 0.01
done
) () in
(* 消费者 *)
let consumer = Thread.create (fun () ->
for _ = 1 to 100 do
let msg = MessageQueue.pop mq in
Printf.printf "收到: %s\n" msg
done
) () in
Thread.join producer;
Thread.join consumer
💡 提示:生产环境建议使用 RabbitMQ 或 Kafka 等成熟的消息队列系统。
Actor 模型
(* 简单 Actor 实现 *)
module Actor = struct
type 'msg t = {
mailbox: 'msg Queue.t;
mutex: Mutex.t;
condition: Condition.t;
mutable running: bool;
}
let create () = {
mailbox = Queue.create ();
mutex = Mutex.create ();
condition = Condition.create ();
running = true;
}
let send actor msg =
Mutex.lock actor.mutex;
Queue.push msg actor.mailbox;
Condition.signal actor.condition;
Mutex.unlock actor.mutex
let receive actor =
Mutex.lock actor.mutex;
while Queue.is_empty actor.mailbox && actor.running do
Condition.wait actor.condition actor.mutex
done;
if actor.running then begin
let msg = Queue.pop actor.mailbox in
Mutex.unlock actor.mutex;
Some msg
end else begin
Mutex.unlock actor.mutex;
None
end
let stop actor =
Mutex.lock actor.mutex;
actor.running <- false;
Condition.signal actor.condition;
Mutex.unlock actor.mutex
end
(* 计数器 Actor *)
type counter_msg =
| Increment
| Decrement
| Get of int ref
| Stop
let counter_actor () =
let actor = Actor.create () in
let count = ref 0 in
let _thread = Thread.create (fun () ->
let running = ref true in
while !running do
match Actor.receive actor with
| Some Increment -> incr count
| Some Decrement -> decr count
| Some (Get result) -> result := !count
| Some Stop -> running := false
| None -> running := false
done
) () in
actor
(* 使用 *)
let () =
let counter = counter_actor () in
for _ = 1 to 100 do
Actor.send counter Increment
done;
let result = ref 0 in
Actor.send counter (Get result);
Thread.delay 0.1;
Printf.printf "计数: %d\n" !result;
Actor.send counter Stop
| Actor 概念 | OCaml 实现 | 说明 |
|---|---|---|
| Actor | Actor.t | 消息接收者 |
| Mailbox | Queue.t | 消息队列 |
| Send | Actor.send | 发送消息 |
| Receive | Actor.receive | 接收消息(阻塞) |
⚠️ 注意:Actor 之间的状态应完全隔离,避免共享可变状态导致竞态条件。
分布式一致性基础
(* 简单的 Raft 共识实现 *)
type node_id = string
type raft_state =
| Follower
| Candidate
| Leader
type log_entry = {
term: int;
index: int;
command: string;
}
type raft_node = {
id: node_id;
mutable state: raft_state;
mutable current_term: int;
mutable voted_for: node_id option;
log: log_entry list ref;
mutable commit_index: int;
mutable last_applied: int;
peers: node_id list;
}
(* AppendEntries RPC *)
type append_entries_request = {
term: int;
leader_id: node_id;
prev_log_index: int;
prev_log_term: int;
entries: log_entry list;
leader_commit: int;
}
type append_entries_response = {
term: int;
success: bool;
}
(* RequestVote RPC *)
type request_vote_request = {
term: int;
candidate_id: node_id;
last_log_index: int;
last_log_term: int;
}
type request_vote_response = {
term: int;
vote_granted: bool;
}
(* 处理 AppendEntries *)
let handle_append_entries node request =
if request.term < node.current_term then
{ term = node.current_term; success = false }
else begin
node.current_term <- request.term;
node.state <- Follower;
(* 验证日志一致性 *)
(* ... *)
{ term = node.current_term; success = true }
end
网络编程(Unix 模块)
(* TCP 服务器 *)
let start_tcp_server port handler =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port));
Unix.listen sock 10;
Printf.printf "服务器监听端口 %d\n" port;
let rec accept_loop () =
let (client_sock, client_addr) = Unix.accept sock in
Printf.printf "新连接: %s\n" (match client_addr with
| Unix.ADDR_INET (addr, port) ->
Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port
| _ -> "unknown");
let _thread = Thread.create (fun () ->
Fun.protect ~finally:(fun () -> Unix.close client_sock)
(fun () -> handler client_sock client_addr)
) () in
accept_loop ()
in
accept_loop ()
(* TCP 客户端 *)
let tcp_connect host port =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
let addr = Unix.ADDR_INET (Unix.inet_addr_of_string host, port) in
Unix.connect sock addr;
sock
(* 发送和接收 *)
let send_all sock data =
let len = String.length data in
let sent = ref 0 in
while !sent < len do
let n = Unix.send sock data !sent (len - !sent) [] in
sent := !sent + n
done
let recv_all sock size =
let buf = Bytes.create size in
let received = ref 0 in
while !received < size do
let n = Unix.recv sock buf !received (size - !received) [] in
if n = 0 then raise End_of_file;
received := !received + n
done;
Bytes.to_string buf
💡 提示:生产环境应使用 Async 或 Lwt 等异步 IO 库,而非阻塞式 Unix 模块。
序列化与反序列化
(* 分布式消息格式 *)
type message =
| Request of int * string
| Response of int * string
| Heartbeat
| Error of string
(* 使用 bin_prot 高效序列化 *)
type msg = {
id: int;
payload: string;
timestamp: float;
} [@@deriving bin_io]
let serialize msg =
let buf = Bin_prot.Common.create_buf 1024 in
let len = bin_write_t buf ~pos:0 msg in
Bytes.sub (Bin_prot.Common.buf_to_bytes buf) 0 len
let deserialize bytes =
let buf = Bin_prot.Common.bytes_to_buf bytes in
bin_read_t buf ~pos_ref:(ref 0)
(* 带长度前缀的消息协议 *)
let send_message sock msg =
let data = serialize msg in
let len = Bytes.length data in
let len_buf = Bytes.create 4 in
Bytes.set_int32_le len_buf 0 (Int32.of_int len);
send_all sock (Bytes.to_string len_buf);
send_all sock (Bytes.to_string data)
let recv_message sock =
let len_buf = recv_all sock 4 in
let len = Int32.to_int (Bytes.get_int32_le (Bytes.of_string len_buf)) in
let data = recv_all sock len in
deserialize (Bytes.of_string data)
分布式追踪
type trace_context = {
trace_id: string;
span_id: string;
parent_span_id: string option;
baggage: (string * string) list;
}
let generate_id () =
let buf = Bytes.create 16 in
for i = 0 to 15 do
Bytes.set buf i (Char.chr (Random.int 256))
done;
Base64.encode_string (Bytes.to_string buf)
let create_trace () =
let trace_id = generate_id () in
let span_id = generate_id () in
{ trace_id; span_id; parent_span_id = None; baggage = [] }
let child_span parent =
{ parent with
span_id = generate_id ();
parent_span_id = Some parent.span_id;
}
(* 传播追踪上下文 *)
let propagate_headers ctx =
[
("X-Trace-Id", ctx.trace_id);
("X-Span-Id", ctx.span_id);
("X-Parent-Span-Id", Option.value ctx.parent_span_id ~default:"");
]
实际案例:分布式任务调度器
(* 任务类型 *)
type task = {
id: string;
payload: string;
priority: int;
max_retries: int;
mutable retries: int;
mutable status: [`Pending | `Running | `Completed | `Failed];
}
(* Worker 节点 *)
module Worker = struct
type t = {
id: string;
task_queue: task MessageQueue.t;
mutable active_tasks: int;
max_tasks: int;
}
let create id max_tasks = {
id;
task_queue = MessageQueue.create ();
active_tasks = 0;
max_tasks;
}
let rec run worker handler =
if worker.active_tasks < worker.max_tasks then begin
match MessageQueue.try_pop worker.task_queue with
| Some task ->
worker.active_tasks <- worker.active_tasks + 1;
let _thread = Thread.create (fun () ->
task.status <- `Running;
(try
handler task;
task.status <- `Completed
with _ ->
task.retries <- task.retries + 1;
if task.retries < task.max_retries then begin
task.status <- `Pending;
MessageQueue.push worker.task_queue task
end else
task.status <- `Failed);
worker.active_tasks <- worker.active_tasks - 1;
) () in
run worker handler
| None ->
Thread.delay 0.1;
run worker handler
end else begin
Thread.delay 0.1;
run worker handler
end
end
(* 调度器 *)
module Scheduler = struct
type t = {
workers: Worker.t list;
task_queue: task Queue.t;
mutex: Mutex.t;
}
let create worker_count tasks_per_worker =
let workers = List.init worker_count (fun i ->
Worker.create (Printf.sprintf "worker_%d" i) tasks_per_worker
) in
{
workers;
task_queue = Queue.create ();
mutex = Mutex.create ();
}
let submit scheduler task =
Mutex.lock scheduler.mutex;
(* 找到最空闲的 worker *)
let worker = List.fold_left (fun best w ->
if w.Worker.active_tasks < best.Worker.active_tasks then w else best
) (List.hd scheduler.workers) (List.tl scheduler.workers) in
MessageQueue.push worker.Worker.task_queue task;
Mutex.unlock scheduler.mutex
let start scheduler handler =
List.iter (fun worker ->
let _thread = Thread.create (Worker.run worker) handler in
()
) scheduler.workers
end
(* 使用 *)
let () =
let scheduler = Scheduler.create 4 10 in
Scheduler.start scheduler (fun task ->
Printf.printf "处理任务 %s\n" task.id;
Thread.delay (Random.float 1.0)
);
(* 提交任务 *)
for i = 1 to 100 do
Scheduler.submit scheduler {
id = Printf.sprintf "task_%d" i;
payload = Printf.sprintf "data_%d" i;
priority = Random.int 10;
max_retries = 3;
retries = 0;
status = `Pending;
}
done;
Thread.delay 30.0
扩展阅读
上一节:性能优化与 Benchmark 下一节:形式验证与属性测试