18 - 分布式
第 18 章:分布式 Erlang
Erlang 天生支持分布式计算。本章学习节点通信、RPC、全局注册和集群管理。
18.1 节点基础
18.1.1 启动节点
%% 短名称节点(同一主机)
erl -sname node1
erl -sname node2
%% 长名称节点(跨网络)
erl -name [email protected]
erl -name [email protected]
%% 指定 cookie(集群认证)
erl -sname node1 -setcookie my_secret_cookie
18.1.2 节点信息
%% 当前节点名
node(). %% 'node1@hostname'
%% 所有连接的节点
nodes(). %% ['node2@hostname', ...]
%% 连接到另一个节点
net_kernel:connect_node('node2@hostname'). %% true
%% 断开连接
net_kernel:disconnect_node('node2@hostname').
18.1.3 Cookie 机制
%% 集群中的所有节点必须有相同的 cookie
%% 查看当前 cookie
erlang:get_cookie(). %% 'my_cookie'
%% 设置 cookie
erlang:set_cookie(node(), 'new_cookie').
%% 也可以通过 -setcookie 参数设置
18.2 远程操作
18.2.1 在远程节点创建进程
%% spawn 在远程节点执行
Pid = spawn('node2@hostname', fun() ->
io:format("Running on ~p~n", [node()])
end).
%% spawn_link
Pid = spawn_link('node2@hostname', fun() -> ... end).
%% spawn_monitor
{Pid, Ref} = spawn_monitor('node2@hostname', fun() -> ... end).
18.2.2 跨节点消息传递
%% 发送消息到远程节点的 PID
RemotePid ! {hello, from, node()}.
%% 跨节点注册进程
%% 在 node2 上
register(my_server, self()).
%% 在 node1 上发送
{my_server, 'node2@hostname'} ! {request, self(), data}.
%% 跨节点注册和查找
global:register_name(my_global_server, Pid).
global:whereis_name(my_global_server). %% 返回 Pid
18.2.3 远程 spawn 的注意事项
%% 远程节点必须有对应的模块代码
%% 方式一:确保远程节点已加载模块
rpc:call(Node, code, load_file, [my_module]),
%% 方式二:使用 rpc:call 直接调用
rpc:call(Node, my_module, my_function, [Args]).
18.3 RPC
18.3.1 基本 RPC
%% 同步调用
Result = rpc:call('node2@hostname', math, pi, []).
%% 3.141592653589793
%% 调用带参数
Result = rpc:call('node2@hostname', lists, sum, [[1,2,3,4,5]]).
%% 15
%% 超时设置
Result = rpc:call('node2@hostname', timer, sleep, [10000], 5000).
%% 超时后返回 {badrpc, timeout}
18.3.2 RPC 变体
| 函数 | 说明 |
|---|---|
rpc:call(Node, M, F, A) | 同步调用 |
rpc:call(Node, M, F, A, Timeout) | 带超时的同步调用 |
rpc:cast(Node, M, F, A) | 异步调用(fire and forget) |
rpc:multicall(Nodes, M, F, A) | 多节点调用 |
rpc:multicall(Nodes, M, F, A, Timeout) | 带超时的多节点调用 |
rpc:parallel_eval([{M,F,A}]) | 并行调用多个函数 |
rpc:pmap({M,F}, ExtraArgs, List) | 并行 map |
18.3.3 多节点调用
%% 对所有节点执行相同的调用
Nodes = nodes(),
{Results, BadNodes} = rpc:multicall(Nodes, erlang, system_info, [process_count]).
%% Results = [1234, 5678], BadNodes = []
%% 并行 map
Results = rpc:pmap({lists, sum}, [], [[1,2],[3,4],[5,6]]).
%% [3, 7, 11]
18.4 全局注册
18.4.1 global 模块
%% 全局注册名称(集群内唯一)
global:register_name(my_service, self()).
%% 全局查找
Pid = global:whereis_name(my_service).
%% 发送消息
global:send(my_service, {hello, self()}).
%% 取消注册
global:unregister_name(my_service).
%% 互斥锁
%% 确保集群中只有一个进程持有锁
global:trans({my_lock, self()},
fun() ->
%% 只有一个节点能执行这里
do_exclusive_work()
end).
18.4.2 pg 模块(进程组)
%% 创建/加入进程组
pg:start_link().
pg:join(my_group, self()).
%% 获取组内所有进程
Members = pg:get_members(my_group).
%% 跨节点的进程组
%% 需要在所有节点启动 pg
pg:start_link('scope_name').
pg:join('scope_name', my_group, self()).
%% 给组内所有进程发送消息
[Pid ! message || Pid <- pg:get_members(my_group)].
18.5 集群管理
18.5.1 自动发现
%% 使用 -connect_all false 参数禁用自动连接
%% 手动连接
net_kernel:connect_node('node2@host').
%% 使用 net_adm 模块
net_adm:ping('node2@host'). %% pong | pang
net_adm:world(). %% 连接所有已知节点
net_adm:world_list(Hosts). %% 连接指定主机的节点
18.5.2 Erlang Port Mapper Daemon (EPMD)
%% EPMD 负责节点名称到端口的映射
%% 启动时自动运行
%% 查看已注册的节点
epmd -names
%% 手动启动 EPMD
epmd -daemon
18.6 实战:分布式任务分发
%% dist_worker.erl
-module(dist_worker).
-export([start/0, submit/1, get_results/0]).
start() ->
register(?MODULE, self()),
loop(#{tasks => [], results => []}).
submit(Task) ->
?MODULE ! {task, self(), Task},
receive {submitted, Ref} -> Ref end.
get_results() ->
?MODULE ! {get_results, self()},
receive {results, Results} -> Results end.
loop(State) ->
receive
{task, From, Task} ->
%% 选择负载最低的节点
Node = select_node(),
Ref = make_ref(),
spawn(Node, fun() ->
Result = execute_task(Task),
?MODULE ! {result, Ref, Result}
end),
From ! {submitted, Ref},
loop(State#{tasks => [{Ref, Task} | maps:get(tasks, State)]});
{result, Ref, Result} ->
Results = [{Ref, Result} | maps:get(results, State)],
loop(State#{results => Results});
{get_results, From} ->
From ! {results, maps:get(results, State)},
loop(State)
end.
select_node() ->
%% 简单策略:轮询
Nodes = [node() | nodes()],
Index = erlang:unique_integer([positive]) rem length(Nodes),
lists:nth(Index + 1, Nodes).
execute_task(Task) ->
%% 执行具体任务
timer:sleep(100),
{ok, processed, Task}.
18.7 注意事项
⚠️ 常见陷阱
| 陷阱 | 说明 |
|---|---|
| Cookie 不匹配 | 不同 cookie 的节点无法连接 |
| 网络分区 | 网络故障可能导致脑裂 |
| 代码不同步 | 远程节点可能没有最新代码 |
| 全局注册冲突 | 同名注册会失败 |
| 延迟 | 跨网络调用有延迟,需要超时处理 |
💡 最佳实践
- 使用相同的 cookie 文件分发到所有节点
- 避免频繁 RPC 调用,考虑消息传递模式
- 为所有跨节点调用设置合理超时
- 使用
pg模块管理进程组,避免单点全局注册 - 考虑使用 lib_cluster 或类似库自动管理集群
18.8 扩展阅读
上一章:17 - 测试 下一章:19 - 发布与部署