Julia 教程 / Julia 并行计算基础
14. 并行计算基础
Julia 内建了强大的并行计算支持,从多进程分布式计算到共享内存多线程,覆盖各种并行需求。
14.1 多进程基础
添加工作进程
using Distributed
# 查看当前进程数
println(nprocs()) # 1(默认只有主进程)
# 添加工作进程
addprocs(4)
println(nprocs()) # 5
println(workers()) # [2, 3, 4, 5]
# 所有进程中加载模块
@everywhere using LinearAlgebra
@everywhere println("我是进程 $(myid())")
# 移除进程
rmprocs(2)
@everywhere:在所有进程定义
# 函数和变量必须用 @everywhere 声明才能在工作进程使用
@everywhere function heavy_compute(x)
s = 0.0
for i in 1:10_000_000
s += sin(x * i)
end
return s
end
@everywhere const GLOBAL_DATA = rand(1000)
14.2 远程调用:remotecall
# 基本远程调用
future = remotecall(sin, 2, π/4) # 在进程2上执行 sin(π/4)
result = fetch(future) # 获取结果
println(result) # 0.7071067811865475
# 异步远程调用
futures = [remotecall(heavy_compute, w, 1.0) for w in workers()]
results = fetch.(futures) # 等待所有结果
# @spawnat:指定进程执行
future = @spawnat 3 begin
local_data = rand(1000)
sum(local_data)
end
println(fetch(future))
14.3 @spawn 与 @spawnat
# @spawn:自动选择进程
future = @spawn sin(π/4) + cos(π/4)
println(fetch(future))
# @spawnat:指定进程
future = @spawnat 2 sin(π/4)
println(fetch(future))
# 在远程进程创建数据
remote_data = @spawnat 3 rand(100, 100)
data = fetch(remote_data)
远程调用的错误处理
# 远程调用失败时,fetch 会抛出异常
future = @spawn error("远程错误")
try
fetch(future)
catch e
println("捕获错误: ", e)
end
⚠️ 注意:远程调用中的异常会在 fetch 时传递到主进程。使用 try-catch 处理。
14.4 并行 for 循环
pmap:并行映射
# pmap 自动分配任务到各进程
using Distributed
addprocs(4)
@everywhere function expensive_function(x)
sleep(0.5) # 模拟耗时
return x^2
end
# 串行
results_serial = [expensive_function(x) for x in 1:8]
# 约4秒
# 并行
results_parallel = pmap(expensive_function, 1:8)
# 约1秒(4个进程)
# 对大数据集使用 pmap
data = rand(100)
results = pmap(x -> sin(x) + cos(x), data)
@distributed for:分布式循环
# 基本分布式循环
n = 100_000_000
pi_estimate = @distributed (+) for i in 1:n
x = rand()
y = rand()
(x^2 + y^2 <= 1) ? 1.0 : 0.0
end * 4 / n
println("π ≈ $pi_estimate")
# 不带归约的分布式循环
@distributed for i in 1:10
println("进程 $(myid()) 处理任务 $i")
end
分布式归约
# 自定义归约操作
using Distributed
# 求和归约
total = @distributed (+) for i in 1:1000
i^2
end
println("平方和: $total")
# 最大值归约
max_val = @distributed max for i in 1:1000
rand()
end
println("最大值: $max_val")
# 数组归约
hist = @distributed (+) for _ in 1:1_000_000
bin = rand(1:10)
counts = zeros(10)
counts[bin] = 1
counts
end
println("分布: ", hist)
14.5 共享内存:SharedArray
using SharedArrays
# 创建共享数组
sa = SharedArray{Float64}(1000, 1000)
# 所有进程可以同时写入(无竞争的区域划分)
@distributed for i in 1:1000
sa[:, i] .= sin.(1:1000) .* i
end
println(size(sa)) # (1000, 1000)
# 转为普通数组
regular = Array(sa)
SharedArray 注意事项
# ✅ 正确:每个进程写不同区域
sa = SharedArray{Float64}(100)
@distributed for i in 1:100
sa[i] = myid() * i # 每个 i 由一个进程处理
end
# ❌ 错误:多个进程写同一位置(数据竞争)
sa = SharedArray{Float64}(1)
@distributed for i in 1:100
sa[1] += 1 # 竞争条件!结果不确定
end
# ✅ 正确:使用原子操作或归约
total = @distributed (+) for i in 1:100
1.0
end
⚠️ 共享内存警告:SharedArray 的写入没有锁保护。确保每个进程只写入自己的区域,或使用归约操作。
14.6 远程引用 Future
# Future 是异步远程计算的句柄
future = @spawnat 2 begin
sleep(2) # 耗时操作
return 42
end
# isready 检查是否完成
println(isready(future)) # false(可能还没完成)
# fetch 阻塞等待
result = fetch(future)
println(result) # 42
# 多个 Future 的管理模式
futures = Dict{Int, Future}()
for i in 1:10
futures[i] = @spawnat workers()[(i-1) % nworkers() + 1] begin
sleep(rand()) # 不同的完成时间
return i^2
end
end
# 等待并收集结果
results = Dict(k => fetch(v) for (k, v) in futures)
14.7 集群配置
本地集群
# 使用所有 CPU 核心
addprocs()
# 指定数量
addprocs(8)
# 使用不同的 Julia 可执行文件
addprocs(2, exename="/path/to/julia")
远程集群
# SSH 集群
machines = ["host1", "host2", "host3"]
addprocs(machines)
# 带配置
addprocs([
("host1", 4), # host1 上启动4个进程
("host2", 2), # host2 上启动2个进程
]; sshflags=`-i ~/.ssh/id_rsa`)
# 使用集群文件
# machines.txt 内容:host1:4\nhost2:4\nhost3:2
addprocs("machines.txt")
环境变量配置
# 启动时指定
# JULIA_NUM_THREADS=4 julia
# JULIA_WORKER_TIMEOUT=60 julia
# 程序中检查
println(nprocs())
println(nworkers())
println(workers())
14.8 并行性能分析
using Distributed
addprocs(4)
@everywhere function compute_heavy(n)
s = 0.0
for i in 1:n
s += sin(i * 0.001)
end
return s
end
# 串行基准
N = 10_000_000
t_serial = @elapsed begin
results = [compute_heavy(N) for _ in 1:16]
end
# pmap 并行
t_pmap = @elapsed begin
results = pmap(compute_heavy, fill(N, 16))
end
# @distributed 并行
t_dist = @elapsed begin
total = @distributed (+) for _ in 1:16
compute_heavy(N)
end
end
println("串行: $(round(t_serial, digits=3))s")
println("pmap: $(round(t_pmap, digits=3))s")
println("@distributed: $(round(t_dist, digits=3))s")
println("加速比(pmap): $(round(t_serial/t_pmap, digits=2))x")
负载均衡
# pmap 自动负载均衡(适合不均匀任务)
# @distributed 均匀分配(适合均匀任务)
# 不均匀任务示例
function variable_work(x)
sleep(x * 0.01) # 工作量随 x 变化
return x^2
end
# pmap 处理不均匀任务更好
times = vcat(fill(0.1, 5), fill(1.0, 5)) # 5快+5慢
results = pmap(variable_work, times)
14.9 并行策略选择指南
| 任务类型 | 推荐方法 | 原因 |
|---|
| 独立任务,结果收集 | pmap | 自动负载均衡 |
| 大循环,归约操作 | @distributed (+) | 简洁高效 |
| 大数组操作 | SharedArray | 避免数据拷贝 |
| 粗粒度异步任务 | @spawnat + fetch | 灵活控制 |
| 数据并行(矩阵) | 多线程 Threads.@threads | 共享内存更高效 |
线程 vs 进程
# 多线程(共享内存,启动快)
println("线程数: ", Threads.nthreads())
Threads.@threads for i in 1:10
println("线程 $(Threads.threadid()): $i")
end
# 多进程(独立内存,可跨机器)
# 适合计算密集型且需要大量独立内存的任务
| 特性 | 多线程 | 多进程 |
|---|
| 内存 | 共享 | 独立 |
| 启动开销 | 极低 | 较高 |
| 通信 | 内存直接访问 | 消息传递 |
| 全局锁 | 有(GIL类似) | 无 |
| 适用场景 | 细粒度并行 | 粗粒度并行 |
14.10 实际业务场景
场景一:并行蒙特卡洛模拟
using Distributed
addprocs(4)
@everywhere function monte_carlo_pi(n_samples)
count = 0
for _ in 1:n_samples
x, y = rand(2)
if x^2 + y^2 <= 1
count += 1
end
end
return count
end
total_samples = 100_000_000
n = nworkers()
samples_per_worker = total_samples ÷ n
t = @elapsed begin
hits = @distributed (+) for _ in 1:n
monte_carlo_pi(samples_per_worker)
end
pi_est = 4 * hits / total_samples
end
println("π ≈ $pi_est (耗时: $(round(t, digits=3))s)")
场景二:并行图像处理
using SharedArrays
function parallel_filter(image_size, filter_size)
h, w = image_size
result = SharedArray{Float64}(h - filter_size + 1, w - filter_size + 1)
input = SharedArray{Float64}(rand(h, w)) # 模拟输入
@distributed for j in 1:size(result, 2)
for i in 1:size(result, 1)
# 简单均值滤波
s = 0.0
for fi in 0:filter_size-1, fj in 0:filter_size-1
s += input[i+fi, j+fj]
end
result[i, j] = s / (filter_size^2)
end
end
return Array(result)
end
filtered = parallel_filter((1000, 1000), 5)
println("输出尺寸: ", size(filtered))
场景三:并行文件处理
using Distributed
@everywhere function process_file(filename)
# 每个进程处理不同文件
lines = readlines(filename)
word_count = sum(length(split(line)) for line in lines)
return (filename, word_count)
end
files = ["file1.txt", "file2.txt", "file3.txt", "file4.txt"]
results = pmap(process_file, files)
for (file, count) in results
println("$file: $count 词")
end
14.11 扩展阅读
14.12 本章小结
| 主题 | 要点 |
|---|
| addprocs | 添加工作进程 |
| @everywhere | 在所有进程定义代码 |
| pmap | 自动负载均衡的并行映射 |
| @distributed | 分布式循环+归约 |
| SharedArray | 共享内存避免拷贝 |
| Future | 异步远程计算句柄 |
| 策略选择 | 均匀任务用 @distributed,不均匀用 pmap |