强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

09 - 并发编程

第 09 章:并发编程

并发(Concurrency)是 Erlang 的灵魂。本章学习 spawn、send、receive、进程链接与监控——构建可靠并发系统的基础。


9.1 Erlang 并发模型

9.1.1 Actor 模型

Erlang 基于 Actor 模型:每个进程是一个独立的 Actor,拥有自己的状态,通过消息传递通信。

┌──────────┐    消息    ┌──────────┐
│  进程 A   │ ────────→ │  进程 B   │
│  (状态)   │           │  (状态)   │
│  (邮箱)   │ ←──────── │  (邮箱)   │
└──────────┘    消息    └──────────┘
概念说明
进程(Process)独立的执行单元,约 2KB 内存
消息(Message)进程间通信的唯一方式
邮箱(Mailbox)每个进程有独立的消息队列
无共享内存进程间不共享任何状态

9.1.2 进程 vs 线程

特性Erlang 进程OS 线程OS 进程
创建开销~1μs, 2KB~10μs, 1MB~1ms, 数MB
最大数量数百万数千数百
调度器BEAMOS 内核OS 内核
通信方式消息传递共享内存IPC
GC每进程独立全局独立
崩溃影响只影响自身可能影响整个进程只影响自身

9.2 创建进程

9.2.1 spawn/1

%% 最简单的进程
Pid = spawn(fun() ->
    io:format("Hello from process ~p!~n", [self()])
end).
%% Hello from process <0.123.0>!

%% spawn 返回 PID(进程标识符)
is_pid(Pid).  %% true

9.2.2 spawn/3 — 模块函数

%% worker.erl
-module(worker).
-export([loop/0, start/0]).

start() ->
    spawn(?MODULE, loop, []).

loop() ->
    receive
        stop ->
            io:format("Worker stopping~n");
        {work, Task} ->
            io:format("Working on: ~p~n", [Task]),
            loop()
    end.

%% 使用
1> Pid = worker:start().
<0.123.0>
2> Pid ! {work, "process data"}.
Working on: process data
3> Pid ! stop.
Worker stopping

9.2.3 spawn 的变体

函数说明
spawn(Fun)创建进程执行 Fun
spawn(Module, Function, Args)创建进程调用 M:F(A)
spawn(Node, Fun)在远程节点创建进程
spawn_link(Fun)创建并链接
spawn_monitor(Fun)创建并监控
spawn_opt(Fun, Opts)带选项创建

9.3 消息传递

9.3.1 发送消息 (!)

%% 语法:Pid ! Message
%% Message 可以是任意 Erlang 值

Pid ! hello.
Pid ! {data, [1, 2, 3]}.
Pid ! {self(), "request"}.

%% 发送给自己
self() ! ping.

%% 发送的返回值是消息本身
Msg = Pid ! hello.
%% Msg = hello

9.3.2 接收消息 (receive)

%% 基本 receive
receive
    Pattern1 -> Action1;
    Pattern2 -> Action2;
    ...
end.

%% 带超时的 receive
receive
    {reply, Data} -> Data
after 5000 ->
    timeout
end.

9.3.3 消息匹配

%% 匹配特定消息
receive
    {hello, Name} ->
        io:format("Hello, ~s!~n", [Name]);
    {bye, Name} ->
        io:format("Goodbye, ~s!~n", [Name])
end.

%% 带守卫的匹配
receive
    {data, N} when is_integer(N), N > 0 ->
        process(N);
    {data, N} when is_integer(N) ->
        io:format("Invalid: ~p~n", [N])
end.

%% 通配符匹配
receive
    _ -> ok  %% 匹配任何消息(不推荐,丢弃有用信息)
end.

%% 绑定变量
receive
    {request, From, Ref, Data} ->
        %% From = 发送方 PID
        %% Ref = 请求引用
        %% Data = 请求数据
        From ! {response, Ref, process(Data)}
end.

9.3.4 超时处理

%% 普通超时
wait_for_message() ->
    receive
        {ready, Pid} -> {ok, Pid}
    after 5000 ->
        {error, timeout}
    end.

%% 无限等待(不设置 after)
wait_forever() ->
    receive
        stop -> ok
    end.

%% 零超时(立即返回,用于非阻塞检查)
check_mailbox() ->
    receive
        Msg -> {got, Msg}
    after 0 ->
        empty
    end.

9.3.5 消息队列清理

%% 清空邮箱中的所有消息
flush() ->
    receive
        _Msg ->
            %% io:format("Flushing: ~p~n", [_Msg]),
            flush()
    after 0 ->
        ok
    end.

%% 使用 erlang:flush() 在 Shell 中清空

9.4 请求-响应模式

9.4.1 同步调用

%% server.erl
-module(server).
-export([start/0, call/2, loop/0]).

start() ->
    spawn(?MODULE, loop, []).

%% 同步调用:发送请求并等待响应
call(Pid, Request) ->
    Ref = make_ref(),
    Pid ! {self(), Ref, Request},
    receive
        {Ref, Response} -> Response
    after 5000 ->
        {error, timeout}
    end.

loop() ->
    receive
        {From, Ref, {echo, Msg}} ->
            From ! {Ref, Msg},
            loop();
        {From, Ref, {add, A, B}} ->
            From ! {Ref, A + B},
            loop();
        {From, Ref, stop} ->
            From ! {Ref, ok}
    end.
$ erl
1> Pid = server:start().
<0.123.0>
2> server:call(Pid, {echo, "hello"}).
"hello"
3> server:call(Pid, {add, 3, 4}).
7
4> server:call(Pid, stop).
ok

💡 关键:使用 make_ref() 生成唯一标识,确保响应匹配正确的请求。

9.4.2 异步调用

%% 异步:只发送,不等待响应
cast(Pid, Msg) ->
    Pid ! {cast, Msg},
    ok.

%% 服务端处理
loop() ->
    receive
        {cast, Msg} ->
            handle_cast(Msg),
            loop();
        {From, Ref, Msg} ->
            From ! {Ref, handle_call(Msg)},
            loop()
    end.

9.5.1 基本概念

%% 链接两个进程
Pid = spawn_link(fun() ->
    receive stop -> ok end
end).

%% 当一个进程崩溃时,链接的进程也会收到退出信号
%% 默认行为:双向崩溃(双向传播)

9.5.2 链接的行为

进程 A ←链接→ 进程 B

当 A 崩溃:
  - B 收到退出信号 {'EXIT', PidA, Reason}
  - 默认:B 也随之崩溃

当 B 崩溃:
  - A 收到退出信号 {'EXIT', PidB, Reason}
  - 默认:A 也随之崩溃

9.5.3 trap_exit

%% 设置 trap_exit 可以捕获退出信号
process_flag(trap_exit, true),

Pid = spawn_link(fun() ->
    receive stop -> ok end
end),

%% 当链接的进程崩溃时,收到消息而非崩溃
receive
    {'EXIT', Pid, Reason} ->
        io:format("Process ~p exited: ~p~n", [Pid, Reason])
end.

9.6 进程监控(Monitor)

9.6.1 基本使用

%% 监控另一个进程
Pid = spawn(fun() ->
    receive stop -> ok end
end),

Ref = monitor(process, Pid),

%% 当被监控的进程退出时,收到消息
%% 注意:不需要 trap_exit!
receive
    {'DOWN', Ref, process, Pid, Reason} ->
        io:format("Process ~p is down: ~p~n", [Pid, Reason])
end.
特性LinkMonitor
方向双向单向
需要 trap_exit是(想要处理退出信号时)
消息格式{'EXIT', Pid, Reason}{'DOWN', Ref, process, Pid, Reason}
自动解除是(进程退出后)是(进程退出后)
多个一对一直接链接一个进程可监控多个目标
推荐场景紧密耦合的进程松耦合的监控

💡 经验法则:大多数情况使用 Monitor,只有在"共生死"的场景使用 Link。


9.7 进程注册

9.7.1 注册名称

%% 注册进程(给 PID 起名字)
Pid = spawn(fun() ->
    receive stop -> ok end
end),
register(my_process, Pid).

%% 使用名字发送消息
my_process ! hello.

%% 查找注册的 PID
whereis(my_process).  %% Pid

%% 取消注册
unregister(my_process).

%% 已注册的进程列表
registered().

9.7.2 注册的限制

%% 每个名字只能注册一个进程
register(name1, Pid1).  %% ok
register(name1, Pid2).  %% 错误!badarg

%% 全局唯一
%% 只有已注册才能用名字发送消息
unregistered_name ! msg.  %% 错误!badarg

9.8 进程信息

%% 当前进程信息
self().                      %% PID
process_info(self()).         %% 详细信息列表
process_info(self(), memory). %% 内存使用
process_info(self(), message_queue_len). %% 消息队列长度
process_info(self(), current_function).  %% 当前执行的函数
process_info(self(), reductions).        %% 执行步数

%% 所有进程
erlang:processes().          %% 所有进程 PID 列表
erlang:system_info(process_count). %% 进程数量

9.9 实战:聊天室

%% chat_room.erl
-module(chat_room).
-export([start/0, join/2, leave/2, send/3, loop/1]).

start() ->
    spawn(?MODULE, loop, [#{name => "General", members => #{}}]).

join(RoomPid, UserPid) ->
    Ref = make_ref(),
    RoomPid ! {join, self(), Ref, UserPid},
    receive {Ref, Result} -> Result end.

leave(RoomPid, UserPid) ->
    Ref = make_ref(),
    RoomPid ! {leave, self(), Ref, UserPid},
    receive {Ref, Result} -> Result end.

send(RoomPid, UserPid, Message) ->
    RoomPid ! {message, UserPid, Message},
    ok.

loop(#{members := Members} = State) ->
    receive
        {join, From, Ref, UserPid} ->
            monitor(process, UserPid),
            NewMembers = Members#{UserPid => true},
            broadcast(Members, {user_joined, UserPid}),
            From ! {Ref, ok},
            loop(State#{members => NewMembers});

        {leave, From, Ref, UserPid} ->
            NewMembers = maps:remove(UserPid, Members),
            broadcast(NewMembers, {user_left, UserPid}),
            From ! {Ref, ok},
            loop(State#{members => NewMembers});

        {message, UserPid, Message} ->
            broadcast(maps:remove(UserPid, Members),
                      {chat, UserPid, Message}),
            loop(State);

        {'DOWN', _Ref, process, UserPid, _Reason} ->
            NewMembers = maps:remove(UserPid, Members),
            broadcast(NewMembers, {user_left, UserPid}),
            loop(State#{members => NewMembers})
    end.

broadcast(Members, Msg) ->
    maps:foreach(fun(Pid, _) -> Pid ! Msg end, Members).

9.10 实战:并发任务执行器

%% task_executor.erl
-module(task_executor).
-export([parallel_map/2, parallel_filter/2, pmap/2]).

%% 并行 map
-spec pmap(fun((A) -> B), [A]) -> [B].
pmap(Fun, List) ->
    Parent = self(),
    Pids = [spawn(fun() -> Parent ! {self(), Fun(X)} end) || X <- List],
    [receive {Pid, Result} -> Result end || Pid <- Pids].

%% 并行 map(带超时)
-spec parallel_map(fun((A) -> B), [A]) -> [B | {error, timeout}].
parallel_map(Fun, List) ->
    Parent = self(),
    Pids = [spawn(fun() -> Parent ! {self(), Fun(X)} end) || X <- List],
    [receive_result(Pid) || Pid <- Pids].

receive_result(Pid) ->
    receive
        {Pid, Result} -> Result
    after 10000 ->
        {error, timeout}
    end.

%% 并行过滤
-spec parallel_filter(fun((A) -> boolean()), [A]) -> [A].
parallel_filter(Pred, List) ->
    Pairs = pmap(fun(X) -> {X, Pred(X)} end, List),
    [X || {X, true} <- Pairs].
$ erl
1> c(task_executor).
{ok, task_executor}
2> task_executor:pmap(fun(X) -> X * X end, [1,2,3,4,5]).
[1,4,9,16,25]
3> task_executor:parallel_filter(fun(X) -> X rem 2 =:= 0 end, [1,2,3,4,5,6]).
[2,4,6]

9.11 注意事项

⚠️ 常见陷阱

陷阱说明
消息不可达发送给已死进程的消息静默丢失
邮箱溢出快速发送者可能导致接收者邮箱暴涨
死锁两个进程互相等待对方消息
消息顺序同一对进程的消息保证有序,但不同进程间的顺序不保证
内存泄漏进程积累未处理的消息

💡 最佳实践

  1. make_ref() 标识请求-响应配对
  2. 设置合理的 after 超时
  3. 使用 Monitor 而非 Link 监控外部进程
  4. 注册常驻进程的名字,方便查找
  5. 注意邮箱大小,必要时丢弃旧消息
  6. 避免在循环中积累未处理的消息

9.12 扩展阅读


上一章:08 - 元组与 Map 下一章:10 - OTP 基础