强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Julia 教程 / Julia 并行计算基础

14. 并行计算基础

Julia 内建了强大的并行计算支持,从多进程分布式计算到共享内存多线程,覆盖各种并行需求。

14.1 多进程基础

添加工作进程

using Distributed

# 查看当前进程数
println(nprocs())  # 1(默认只有主进程)

# 添加工作进程
addprocs(4)
println(nprocs())  # 5
println(workers()) # [2, 3, 4, 5]

# 所有进程中加载模块
@everywhere using LinearAlgebra
@everywhere println("我是进程 $(myid())")

# 移除进程
rmprocs(2)

@everywhere:在所有进程定义

# 函数和变量必须用 @everywhere 声明才能在工作进程使用
@everywhere function heavy_compute(x)
    s = 0.0
    for i in 1:10_000_000
        s += sin(x * i)
    end
    return s
end

@everywhere const GLOBAL_DATA = rand(1000)

14.2 远程调用:remotecall

# 基本远程调用
future = remotecall(sin, 2, π/4)  # 在进程2上执行 sin(π/4)
result = fetch(future)             # 获取结果
println(result)  # 0.7071067811865475

# 异步远程调用
futures = [remotecall(heavy_compute, w, 1.0) for w in workers()]
results = fetch.(futures)  # 等待所有结果

# @spawnat:指定进程执行
future = @spawnat 3 begin
    local_data = rand(1000)
    sum(local_data)
end
println(fetch(future))

14.3 @spawn 与 @spawnat

# @spawn:自动选择进程
future = @spawn sin(π/4) + cos(π/4)
println(fetch(future))

# @spawnat:指定进程
future = @spawnat 2 sin(π/4)
println(fetch(future))

# 在远程进程创建数据
remote_data = @spawnat 3 rand(100, 100)
data = fetch(remote_data)

远程调用的错误处理

# 远程调用失败时,fetch 会抛出异常
future = @spawn error("远程错误")
try
    fetch(future)
catch e
    println("捕获错误: ", e)
end

⚠️ 注意:远程调用中的异常会在 fetch 时传递到主进程。使用 try-catch 处理。

14.4 并行 for 循环

pmap:并行映射

# pmap 自动分配任务到各进程
using Distributed
addprocs(4)

@everywhere function expensive_function(x)
    sleep(0.5)  # 模拟耗时
    return x^2
end

# 串行
results_serial = [expensive_function(x) for x in 1:8]
# 约4秒

# 并行
results_parallel = pmap(expensive_function, 1:8)
# 约1秒(4个进程)

# 对大数据集使用 pmap
data = rand(100)
results = pmap(x -> sin(x) + cos(x), data)

@distributed for:分布式循环

# 基本分布式循环
n = 100_000_000
pi_estimate = @distributed (+) for i in 1:n
    x = rand()
    y = rand()
    (x^2 + y^2 <= 1) ? 1.0 : 0.0
end * 4 / n

println("π ≈ $pi_estimate")

# 不带归约的分布式循环
@distributed for i in 1:10
    println("进程 $(myid()) 处理任务 $i")
end

分布式归约

# 自定义归约操作
using Distributed

# 求和归约
total = @distributed (+) for i in 1:1000
    i^2
end
println("平方和: $total")

# 最大值归约
max_val = @distributed max for i in 1:1000
    rand()
end
println("最大值: $max_val")

# 数组归约
hist = @distributed (+) for _ in 1:1_000_000
    bin = rand(1:10)
    counts = zeros(10)
    counts[bin] = 1
    counts
end
println("分布: ", hist)

14.5 共享内存:SharedArray

using SharedArrays

# 创建共享数组
sa = SharedArray{Float64}(1000, 1000)

# 所有进程可以同时写入(无竞争的区域划分)
@distributed for i in 1:1000
    sa[:, i] .= sin.(1:1000) .* i
end

println(size(sa))  # (1000, 1000)

# 转为普通数组
regular = Array(sa)

SharedArray 注意事项

# ✅ 正确:每个进程写不同区域
sa = SharedArray{Float64}(100)
@distributed for i in 1:100
    sa[i] = myid() * i  # 每个 i 由一个进程处理
end

# ❌ 错误:多个进程写同一位置(数据竞争)
sa = SharedArray{Float64}(1)
@distributed for i in 1:100
    sa[1] += 1  # 竞争条件!结果不确定
end

# ✅ 正确:使用原子操作或归约
total = @distributed (+) for i in 1:100
    1.0
end

⚠️ 共享内存警告SharedArray 的写入没有锁保护。确保每个进程只写入自己的区域,或使用归约操作。

14.6 远程引用 Future

# Future 是异步远程计算的句柄
future = @spawnat 2 begin
    sleep(2)  # 耗时操作
    return 42
end

# isready 检查是否完成
println(isready(future))  # false(可能还没完成)

# fetch 阻塞等待
result = fetch(future)
println(result)  # 42

# 多个 Future 的管理模式
futures = Dict{Int, Future}()
for i in 1:10
    futures[i] = @spawnat workers()[(i-1) % nworkers() + 1] begin
        sleep(rand())  # 不同的完成时间
        return i^2
    end
end

# 等待并收集结果
results = Dict(k => fetch(v) for (k, v) in futures)

14.7 集群配置

本地集群

# 使用所有 CPU 核心
addprocs()

# 指定数量
addprocs(8)

# 使用不同的 Julia 可执行文件
addprocs(2, exename="/path/to/julia")

远程集群

# SSH 集群
machines = ["host1", "host2", "host3"]
addprocs(machines)

# 带配置
addprocs([
    ("host1", 4),  # host1 上启动4个进程
    ("host2", 2),  # host2 上启动2个进程
]; sshflags=`-i ~/.ssh/id_rsa`)

# 使用集群文件
# machines.txt 内容:host1:4\nhost2:4\nhost3:2
addprocs("machines.txt")

环境变量配置

# 启动时指定
# JULIA_NUM_THREADS=4 julia
# JULIA_WORKER_TIMEOUT=60 julia

# 程序中检查
println(nprocs())
println(nworkers())
println(workers())

14.8 并行性能分析

using Distributed
addprocs(4)

@everywhere function compute_heavy(n)
    s = 0.0
    for i in 1:n
        s += sin(i * 0.001)
    end
    return s
end

# 串行基准
N = 10_000_000
t_serial = @elapsed begin
    results = [compute_heavy(N) for _ in 1:16]
end

# pmap 并行
t_pmap = @elapsed begin
    results = pmap(compute_heavy, fill(N, 16))
end

# @distributed 并行
t_dist = @elapsed begin
    total = @distributed (+) for _ in 1:16
        compute_heavy(N)
    end
end

println("串行: $(round(t_serial, digits=3))s")
println("pmap: $(round(t_pmap, digits=3))s")
println("@distributed: $(round(t_dist, digits=3))s")
println("加速比(pmap): $(round(t_serial/t_pmap, digits=2))x")

负载均衡

# pmap 自动负载均衡(适合不均匀任务)
# @distributed 均匀分配(适合均匀任务)

# 不均匀任务示例
function variable_work(x)
    sleep(x * 0.01)  # 工作量随 x 变化
    return x^2
end

# pmap 处理不均匀任务更好
times = vcat(fill(0.1, 5), fill(1.0, 5))  # 5快+5慢
results = pmap(variable_work, times)

14.9 并行策略选择指南

任务类型推荐方法原因
独立任务,结果收集pmap自动负载均衡
大循环,归约操作@distributed (+)简洁高效
大数组操作SharedArray避免数据拷贝
粗粒度异步任务@spawnat + fetch灵活控制
数据并行(矩阵)多线程 Threads.@threads共享内存更高效

线程 vs 进程

# 多线程(共享内存,启动快)
println("线程数: ", Threads.nthreads())

Threads.@threads for i in 1:10
    println("线程 $(Threads.threadid()): $i")
end

# 多进程(独立内存,可跨机器)
# 适合计算密集型且需要大量独立内存的任务
特性多线程多进程
内存共享独立
启动开销极低较高
通信内存直接访问消息传递
全局锁有(GIL类似)
适用场景细粒度并行粗粒度并行

14.10 实际业务场景

场景一:并行蒙特卡洛模拟

using Distributed
addprocs(4)

@everywhere function monte_carlo_pi(n_samples)
    count = 0
    for _ in 1:n_samples
        x, y = rand(2)
        if x^2 + y^2 <= 1
            count += 1
        end
    end
    return count
end

total_samples = 100_000_000
n = nworkers()
samples_per_worker = total_samples ÷ n

t = @elapsed begin
    hits = @distributed (+) for _ in 1:n
        monte_carlo_pi(samples_per_worker)
    end
    pi_est = 4 * hits / total_samples
end

println("π ≈ $pi_est (耗时: $(round(t, digits=3))s)")

场景二:并行图像处理

using SharedArrays

function parallel_filter(image_size, filter_size)
    h, w = image_size
    result = SharedArray{Float64}(h - filter_size + 1, w - filter_size + 1)
    input = SharedArray{Float64}(rand(h, w))  # 模拟输入

    @distributed for j in 1:size(result, 2)
        for i in 1:size(result, 1)
            # 简单均值滤波
            s = 0.0
            for fi in 0:filter_size-1, fj in 0:filter_size-1
                s += input[i+fi, j+fj]
            end
            result[i, j] = s / (filter_size^2)
        end
    end

    return Array(result)
end

filtered = parallel_filter((1000, 1000), 5)
println("输出尺寸: ", size(filtered))

场景三:并行文件处理

using Distributed

@everywhere function process_file(filename)
    # 每个进程处理不同文件
    lines = readlines(filename)
    word_count = sum(length(split(line)) for line in lines)
    return (filename, word_count)
end

files = ["file1.txt", "file2.txt", "file3.txt", "file4.txt"]
results = pmap(process_file, files)

for (file, count) in results
    println("$file: $count 词")
end

14.11 扩展阅读

资源链接
Julia 官方文档 - Parallel Computinghttps://docs.julialang.org/en/v1/manual/parallel-computing/
Julia 官方文档 - Multi-threadinghttps://docs.julialang.org/en/v1/manual/multi-threading/
Dagger.jlhttps://github.com/JuliaParallel/Dagger.jl.jl
MPI.jlhttps://github.com/JuliaParallel/MPI.jl
DistributedArrays.jlhttps://github.com/distributedArrays/DistributedArrays.jl

14.12 本章小结

主题要点
addprocs添加工作进程
@everywhere在所有进程定义代码
pmap自动负载均衡的并行映射
@distributed分布式循环+归约
SharedArray共享内存避免拷贝
Future异步远程计算句柄
策略选择均匀任务用 @distributed,不均匀用 pmap