第11章:Erlang 进程 —— Actor 模型的先驱
第11章:Erlang 进程 —— Actor 模型的先驱
11.1 Erlang 的并发哲学
Erlang 由 Joe Armstrong 在 1986 年为爱立信的电信系统设计,其核心设计目标是:
- 高并发:同时处理数百万个连接
- 高可用:系统运行数年不停机
- 容错:部分失败不影响整体
- 软实时:保证响应时间
Erlang 的格言:“Let it crash”(让它崩溃)——与其防御性编程,不如快速失败并由监督者重启。
11.2 轻量级进程
创建进程
%% Erlang
Pid = spawn(fun() ->
receive
{From, hello} ->
From ! {self(), hi},
io:format("收到 hello~n");
_ ->
io:format("收到未知消息~n")
end
end).
%% 发送消息
Pid ! {self(), hello}.
# Elixir(Erlang 之上的现代语言)
pid = spawn(fn ->
receive do
{:hello, sender} ->
send(sender, {:hi, self()})
IO.puts("收到 hello")
_ ->
IO.puts("收到未知消息")
end
end)
send(pid, {:hello, self()})
进程 vs OS 线程 vs Goroutine
| 特性 | Erlang 进程 | OS 线程 | Go goroutine |
|---|---|---|---|
| 内存占用 | ~300 字节初始 | ~1MB | ~2KB 初始 |
| 创建时间 | ~1μs | ~1ms | ~0.3μs |
| 最大数量 | 数百万 | 数千 | 数百万 |
| 调度 | VM 抢占式(Reduction-based) | OS 抢占式 | Go 运行时协作式 |
| GC | 每进程独立 GC | 全局 GC | 全局 GC |
| 通信 | 消息传递(无共享) | 共享内存 | Channel/共享内存 |
11.3 消息传递
消息发送与接收
%% 发送是异步的 — 立即返回
Pid ! Message.
%% 接收是选择性的 — Pattern Matching
receive
Pattern1 -> Body1;
Pattern2 -> Body2;
after
Timeout -> TimeoutBody
end.
消息队列(Mailbox)
进程的 Mailbox:
┌──────────────────────────────┐
│ 进程 B 的 Mailbox │
│ │
│ 消息3: {update, X=10} │
│ 消息2: {data, [1,2,3]} │
│ 消息1: {hello, Pid_A} │
│ │
│ receive 从上到下匹配 │
└──────────────────────────────┘
注意:发送方不会阻塞(异步发送)
接收方可能阻塞(如果没有匹配的消息)
选择性接收
%% 只处理感兴趣的消息,忽略其他
receive
{reply, Ref, Data} ->
handle_reply(Ref, Data)
%% 不匹配 {ping, _} 或其他消息
%% 它们留在 mailbox 中
after 5000 ->
timeout
end.
关键优势:选择性接收(Selective Receive)使得 Erlang 进程可以按优先级处理消息,而不像 Go 的 Channel 那样只能按顺序接收。
11.4 调度器
Reduction-based 调度
Erlang VM(BEAM)使用**归约(Reduction)**计数来实现公平调度:
- 每个进程每次被调度执行一定数量的"归约"(约 4000 个函数调用)
- 归约用完后,进程被暂停,让出 CPU 给其他进程
- 不需要显式 yield
调度器:
进程A: ████░░░░████░░░░████ ← 每 4000 reductions 切换
进程B: ░░░░████░░░░████░░░░
进程C: ░░░░░░░░████░░░░████
确保公平性:CPU 密集型进程不会独占 CPU
调度器数量
%% 默认调度器数量 = CPU 核心数
erlang:system_info(schedulers). %% 查看调度器数量
erlang:system_info(schedulers_online). %% 查看在线调度器数量
%% VM 会自动处理 CPU 亲和性、负载均衡
11.5 容错机制 — Let It Crash
监督树(Supervision Tree)
┌──────────┐
│ 顶级监督者 │
└────┬─────┘
┌────────┼────────┐
│ │ │
┌────┴───┐┌───┴───┐┌───┴────┐
│子监督者 ││工作进程││子监督者 │
└───┬────┘└───────┘└───┬────┘
┌───┼───┐ ┌───┼───┐
│ │ │ │ │ │
W1 W2 W3 W4 W5 W6
重启策略
%% 三种重启策略
%% one_for_one: 只重启崩溃的那个子进程
%% one_for_all: 重启所有子进程
%% rest_for_one: 重启崩溃进程及其之后启动的进程
init(_) ->
{ok, {{one_for_one, 5, 10}, [
{worker1, {worker1, start_link, []},
permanent, 5000, worker, [worker1]},
{worker2, {worker2, start_link, []},
permanent, 5000, worker, [worker2]}
]}}.
# Elixir 的监督者
defmodule MyApp.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
children = [
{MyApp.Worker1, []},
{MyApp.Worker2, []},
{MyApp.Worker3, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
重启策略对比
| 策略 | 行为 | 适用场景 |
|---|---|---|
one_for_one | 只重启崩溃的子进程 | 子进程相互独立 |
one_for_all | 重启所有子进程 | 子进程相互依赖 |
rest_for_one | 重启崩溃进程及后续进程 | 子进程有启动顺序 |
11.6 OTP 行为模式(Behaviours)
GenServer — 通用服务器
defmodule Counter do
use GenServer
# 客户端 API
def start_link(initial_value) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
def increment, do: GenServer.cast(__MODULE__, :increment)
def get_value, do: GenServer.call(__MODULE__, :get_value)
# 服务端回调
@impl true
def init(initial_value), do: {:ok, initial_value}
@impl true
def handle_call(:get_value, _from, state) do
{:reply, state, state}
end
@impl true
def handle_cast(:increment, state) do
{:noreply, state + 1}
end
end
# 使用
{:ok, _pid} = Counter.start_link(0)
Counter.increment()
Counter.increment()
Counter.get_value() # => 2
11.7 分布式 Erlang
%% 启动两个节点
%% node1@host> erl -name [email protected] -setcookie mycookie
%% node2@host> erl -name [email protected] -setcookie mycookie
%% 在 node1 上
Pid = spawn('[email protected]', fun() ->
io:format("我在远程节点运行!~n")
end).
%% 消息传递跨节点透明
Pid ! {hello, self()}.
11.8 业务场景:实时聊天系统
defmodule Chat.Room do
use GenServer
def start_link(room_id) do
GenServer.start_link(__MODULE__, room_id, name: via_tuple(room_id))
end
def join(room_id, user_pid), do: GenServer.cast(via_tuple(room_id), {:join, user_pid})
def send_msg(room_id, user, msg), do: GenServer.cast(via_tuple(room_id), {:msg, user, msg})
@impl true
def init(room_id), do: {:ok, %{room_id: room_id, members: MapSet.new()}}
@impl true
def handle_cast({:join, user_pid}, state) do
Process.monitor(user_pid)
{:noreply, %{state | members: MapSet.put(state.members, user_pid)}}
end
@impl true
def handle_cast({:msg, user, msg}, state) do
Enum.each(state.members, fn pid ->
send(pid, {:chat_msg, user, msg})
end)
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _, :process, pid, _}, state) do
{:noreply, %{state | members: MapSet.delete(state.members, pid)}}
end
defp via_tuple(room_id), do: {:via, Registry, {Chat.Registry, room_id}}
end
11.9 本章小结
| 要点 | 说明 |
|---|---|
| 轻量级进程 | ~300 字节,数百万级并发 |
| 消息传递 | 异步发送,选择性接收,无共享状态 |
| Reduction 调度 | 公平的抢占式调度,无需显式 yield |
| Let It Crash | 监督树实现自动恢复 |
| OTP | GenServer、Supervisor 等成熟模式 |
| 分布式 | 跨节点消息传递透明 |
下一章预告:C++ 的协程之路——从 Boost.Fiber 到 C++20 Coroutines。
扩展阅读
- Erlang 官方文档
- Elixir 官方文档
- Programming Erlang (2nd Edition) — Joe Armstrong
- BEAM Wisdoms — BEAM VM 内部实现
- The Erlang Runtime System — 详尽的 BEAM 书籍