强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OCaml 教程 / 分布式系统基础

分布式系统基础

本节介绍如何使用 OCaml 构建分布式系统,涵盖 RPC、消息队列、Actor 模型等核心概念。

RPC 框架 (Rpc_async)

opam install async_rpc
(* 定义 RPC 协议 *)
open Core
open Async

module Protocol = struct
  type query = {
    name: string;
    age: int;
  } [@@deriving bin_io, sexp]

  type response = {
    greeting: string;
    processed_at: string;
  } [@@deriving bin_io, sexp]

  (* RPC 定义 *)
  let rpc =
    Rpc.Rpc.create
      ~name:"greet"
      ~version:1
      ~bin_query
      ~bin_response
end

(* 服务端 *)
let server_impl =
  Rpc.Rpc.implement Protocol.rpc (fun () query ->
    let greeting = Printf.sprintf "Hello, %s! You are %d years old." query.name query.age in
    let processed_at = Time.to_string_utc (Time.now ()) in
    return { Protocol.greeting; processed_at }
  )

let start_server () =
  let implementations = Rpc.Implementations.create_exn
    ~implementations:[server_impl]
    ~on_unknown_rpc:`Raise
  in
  let%bind server = Tcp.Server.create
    ~on_handler_error:`Raise
    (Tcp.Where_to_listen.of_port 8080)
    (fun _addr reader writer ->
      Rpc.Connection.server_with_close reader writer
        ~implementations
        ~connection_state:(fun _conn -> ())
        ~on_handshake_error:`Raise)
  in
  Tcp.Server.close_finished server

(* 客户端 *)
let call_greet host port name age =
  let%bind conn = Rpc.Connection.client (Tcp.Where_to_connect.of_host_and_port { host; port }) in
  match conn with
  | Ok connection ->
    let query = { Protocol.name; age } in
    let%bind response = Rpc.Rpc.dispatch Protocol.rpc connection query in
    (match response with
     | Ok resp ->
       printf "响应: %s\n" resp.greeting;
       printf "处理时间: %s\n" resp.processed_at;
       Rpc.Connection.close connection
     | Error err ->
       printf "错误: %s\n" (Error.to_string_hum err);
       Rpc.Connection.close connection)
  | Error err ->
    printf "连接失败: %s\n" (Error.to_string_hum err);
    return ()
RPC 组件说明
Rpc.Rpc.create定义 RPC 端点
bin_query/bin_response序列化格式
Rpc.Rpc.implement实现服务端
Rpc.Rpc.dispatch调用客户端

消息队列集成

(* Redis 消息队列 *)
open Lwt.Infix

(* 使用 Redis 发布/订阅 *)
let publish_message channel message =
  let%lwt conn = Redis_lwt.Client.connect { host = "localhost"; port = 6379 } in
  let%lwt _ = Redis_lwt.Client.publish conn channel message in
  Redis_lwt.Client.disconnect conn

let subscribe_channel channel handler =
  let%lwt conn = Redis_lwt.Client.connect { host = "localhost"; port = 6379 } in
  let%lwt sub = Redis_lwt.Client.subscribe conn [channel] in
  let rec loop () =
    let%lwt msg = Redis_lwt.Client.receive sub in
    handler msg;
    loop ()
  in
  loop ()

(* 简单消息队列 *)
module MessageQueue = struct
  type 'a t = {
    queue: 'a Queue.t;
    mutex: Mutex.t;
    condition: Condition.t;
  }

  let create () = {
    queue = Queue.create ();
    mutex = Mutex.create ();
    condition = Condition.create ();
  }

  let push q msg =
    Mutex.lock q.mutex;
    Queue.push msg q.queue;
    Condition.signal q.condition;
    Mutex.unlock q.mutex

  let pop q =
    Mutex.lock q.mutex;
    while Queue.is_empty q.queue do
      Condition.wait q.condition q.mutex
    done;
    let msg = Queue.pop q.queue in
    Mutex.unlock q.mutex;
    msg

  let try_pop q =
    Mutex.lock q.mutex;
    let result =
      if Queue.is_empty q.queue then None
      else Some (Queue.pop q.queue)
    in
    Mutex.unlock q.mutex;
    result
end

(* 使用 *)
let () =
  let mq = MessageQueue.create () in
  (* 生产者 *)
  let producer = Thread.create (fun () ->
    for i = 1 to 100 do
      MessageQueue.push mq (Printf.sprintf "消息 %d" i);
      Thread.delay 0.01
    done
  ) () in
  (* 消费者 *)
  let consumer = Thread.create (fun () ->
    for _ = 1 to 100 do
      let msg = MessageQueue.pop mq in
      Printf.printf "收到: %s\n" msg
    done
  ) () in
  Thread.join producer;
  Thread.join consumer

💡 提示:生产环境建议使用 RabbitMQ 或 Kafka 等成熟的消息队列系统。

Actor 模型

(* 简单 Actor 实现 *)
module Actor = struct
  type 'msg t = {
    mailbox: 'msg Queue.t;
    mutex: Mutex.t;
    condition: Condition.t;
    mutable running: bool;
  }

  let create () = {
    mailbox = Queue.create ();
    mutex = Mutex.create ();
    condition = Condition.create ();
    running = true;
  }

  let send actor msg =
    Mutex.lock actor.mutex;
    Queue.push msg actor.mailbox;
    Condition.signal actor.condition;
    Mutex.unlock actor.mutex

  let receive actor =
    Mutex.lock actor.mutex;
    while Queue.is_empty actor.mailbox && actor.running do
      Condition.wait actor.condition actor.mutex
    done;
    if actor.running then begin
      let msg = Queue.pop actor.mailbox in
      Mutex.unlock actor.mutex;
      Some msg
    end else begin
      Mutex.unlock actor.mutex;
      None
    end

  let stop actor =
    Mutex.lock actor.mutex;
    actor.running <- false;
    Condition.signal actor.condition;
    Mutex.unlock actor.mutex
end

(* 计数器 Actor *)
type counter_msg =
  | Increment
  | Decrement
  | Get of int ref
  | Stop

let counter_actor () =
  let actor = Actor.create () in
  let count = ref 0 in
  let _thread = Thread.create (fun () ->
    let running = ref true in
    while !running do
      match Actor.receive actor with
      | Some Increment -> incr count
      | Some Decrement -> decr count
      | Some (Get result) -> result := !count
      | Some Stop -> running := false
      | None -> running := false
    done
  ) () in
  actor

(* 使用 *)
let () =
  let counter = counter_actor () in
  for _ = 1 to 100 do
    Actor.send counter Increment
  done;
  let result = ref 0 in
  Actor.send counter (Get result);
  Thread.delay 0.1;
  Printf.printf "计数: %d\n" !result;
  Actor.send counter Stop
Actor 概念OCaml 实现说明
ActorActor.t消息接收者
MailboxQueue.t消息队列
SendActor.send发送消息
ReceiveActor.receive接收消息(阻塞)

⚠️ 注意:Actor 之间的状态应完全隔离,避免共享可变状态导致竞态条件。

分布式一致性基础

(* 简单的 Raft 共识实现 *)
type node_id = string

type raft_state =
  | Follower
  | Candidate
  | Leader

type log_entry = {
  term: int;
  index: int;
  command: string;
}

type raft_node = {
  id: node_id;
  mutable state: raft_state;
  mutable current_term: int;
  mutable voted_for: node_id option;
  log: log_entry list ref;
  mutable commit_index: int;
  mutable last_applied: int;
  peers: node_id list;
}

(* AppendEntries RPC *)
type append_entries_request = {
  term: int;
  leader_id: node_id;
  prev_log_index: int;
  prev_log_term: int;
  entries: log_entry list;
  leader_commit: int;
}

type append_entries_response = {
  term: int;
  success: bool;
}

(* RequestVote RPC *)
type request_vote_request = {
  term: int;
  candidate_id: node_id;
  last_log_index: int;
  last_log_term: int;
}

type request_vote_response = {
  term: int;
  vote_granted: bool;
}

(* 处理 AppendEntries *)
let handle_append_entries node request =
  if request.term < node.current_term then
    { term = node.current_term; success = false }
  else begin
    node.current_term <- request.term;
    node.state <- Follower;
    (* 验证日志一致性 *)
    (* ... *)
    { term = node.current_term; success = true }
  end

网络编程(Unix 模块)

(* TCP 服务器 *)
let start_tcp_server port handler =
  let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
  Unix.setsockopt sock Unix.SO_REUSEADDR true;
  Unix.bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port));
  Unix.listen sock 10;
  
  Printf.printf "服务器监听端口 %d\n" port;
  
  let rec accept_loop () =
    let (client_sock, client_addr) = Unix.accept sock in
    Printf.printf "新连接: %s\n" (match client_addr with
      | Unix.ADDR_INET (addr, port) ->
        Printf.sprintf "%s:%d" (Unix.string_of_inet_addr addr) port
      | _ -> "unknown");
    
    let _thread = Thread.create (fun () ->
      Fun.protect ~finally:(fun () -> Unix.close client_sock)
        (fun () -> handler client_sock client_addr)
    ) () in
    accept_loop ()
  in
  accept_loop ()

(* TCP 客户端 *)
let tcp_connect host port =
  let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
  let addr = Unix.ADDR_INET (Unix.inet_addr_of_string host, port) in
  Unix.connect sock addr;
  sock

(* 发送和接收 *)
let send_all sock data =
  let len = String.length data in
  let sent = ref 0 in
  while !sent < len do
    let n = Unix.send sock data !sent (len - !sent) [] in
    sent := !sent + n
  done

let recv_all sock size =
  let buf = Bytes.create size in
  let received = ref 0 in
  while !received < size do
    let n = Unix.recv sock buf !received (size - !received) [] in
    if n = 0 then raise End_of_file;
    received := !received + n
  done;
  Bytes.to_string buf

💡 提示:生产环境应使用 AsyncLwt 等异步 IO 库,而非阻塞式 Unix 模块。

序列化与反序列化

(* 分布式消息格式 *)
type message =
  | Request of int * string
  | Response of int * string
  | Heartbeat
  | Error of string

(* 使用 bin_prot 高效序列化 *)
type msg = {
  id: int;
  payload: string;
  timestamp: float;
} [@@deriving bin_io]

let serialize msg =
  let buf = Bin_prot.Common.create_buf 1024 in
  let len = bin_write_t buf ~pos:0 msg in
  Bytes.sub (Bin_prot.Common.buf_to_bytes buf) 0 len

let deserialize bytes =
  let buf = Bin_prot.Common.bytes_to_buf bytes in
  bin_read_t buf ~pos_ref:(ref 0)

(* 带长度前缀的消息协议 *)
let send_message sock msg =
  let data = serialize msg in
  let len = Bytes.length data in
  let len_buf = Bytes.create 4 in
  Bytes.set_int32_le len_buf 0 (Int32.of_int len);
  send_all sock (Bytes.to_string len_buf);
  send_all sock (Bytes.to_string data)

let recv_message sock =
  let len_buf = recv_all sock 4 in
  let len = Int32.to_int (Bytes.get_int32_le (Bytes.of_string len_buf)) in
  let data = recv_all sock len in
  deserialize (Bytes.of_string data)

分布式追踪

type trace_context = {
  trace_id: string;
  span_id: string;
  parent_span_id: string option;
  baggage: (string * string) list;
}

let generate_id () =
  let buf = Bytes.create 16 in
  for i = 0 to 15 do
    Bytes.set buf i (Char.chr (Random.int 256))
  done;
  Base64.encode_string (Bytes.to_string buf)

let create_trace () =
  let trace_id = generate_id () in
  let span_id = generate_id () in
  { trace_id; span_id; parent_span_id = None; baggage = [] }

let child_span parent =
  { parent with
    span_id = generate_id ();
    parent_span_id = Some parent.span_id;
  }

(* 传播追踪上下文 *)
let propagate_headers ctx =
  [
    ("X-Trace-Id", ctx.trace_id);
    ("X-Span-Id", ctx.span_id);
    ("X-Parent-Span-Id", Option.value ctx.parent_span_id ~default:"");
  ]

实际案例:分布式任务调度器

(* 任务类型 *)
type task = {
  id: string;
  payload: string;
  priority: int;
  max_retries: int;
  mutable retries: int;
  mutable status: [`Pending | `Running | `Completed | `Failed];
}

(* Worker 节点 *)
module Worker = struct
  type t = {
    id: string;
    task_queue: task MessageQueue.t;
    mutable active_tasks: int;
    max_tasks: int;
  }

  let create id max_tasks = {
    id;
    task_queue = MessageQueue.create ();
    active_tasks = 0;
    max_tasks;
  }

  let rec run worker handler =
    if worker.active_tasks < worker.max_tasks then begin
      match MessageQueue.try_pop worker.task_queue with
      | Some task ->
        worker.active_tasks <- worker.active_tasks + 1;
        let _thread = Thread.create (fun () ->
          task.status <- `Running;
          (try
            handler task;
            task.status <- `Completed
          with _ ->
            task.retries <- task.retries + 1;
            if task.retries < task.max_retries then begin
              task.status <- `Pending;
              MessageQueue.push worker.task_queue task
            end else
              task.status <- `Failed);
          worker.active_tasks <- worker.active_tasks - 1;
        ) () in
        run worker handler
      | None ->
        Thread.delay 0.1;
        run worker handler
    end else begin
      Thread.delay 0.1;
      run worker handler
    end
end

(* 调度器 *)
module Scheduler = struct
  type t = {
    workers: Worker.t list;
    task_queue: task Queue.t;
    mutex: Mutex.t;
  }

  let create worker_count tasks_per_worker =
    let workers = List.init worker_count (fun i ->
      Worker.create (Printf.sprintf "worker_%d" i) tasks_per_worker
    ) in
    {
      workers;
      task_queue = Queue.create ();
      mutex = Mutex.create ();
    }

  let submit scheduler task =
    Mutex.lock scheduler.mutex;
    (* 找到最空闲的 worker *)
    let worker = List.fold_left (fun best w ->
      if w.Worker.active_tasks < best.Worker.active_tasks then w else best
    ) (List.hd scheduler.workers) (List.tl scheduler.workers) in
    MessageQueue.push worker.Worker.task_queue task;
    Mutex.unlock scheduler.mutex

  let start scheduler handler =
    List.iter (fun worker ->
      let _thread = Thread.create (Worker.run worker) handler in
      ()
    ) scheduler.workers
end

(* 使用 *)
let () =
  let scheduler = Scheduler.create 4 10 in
  Scheduler.start scheduler (fun task ->
    Printf.printf "处理任务 %s\n" task.id;
    Thread.delay (Random.float 1.0)
  );
  
  (* 提交任务 *)
  for i = 1 to 100 do
    Scheduler.submit scheduler {
      id = Printf.sprintf "task_%d" i;
      payload = Printf.sprintf "data_%d" i;
      priority = Random.int 10;
      max_retries = 3;
      retries = 0;
      status = `Pending;
    }
  done;
  
  Thread.delay 30.0

扩展阅读


上一节性能优化与 Benchmark 下一节形式验证与属性测试