强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Julia 教程 / Julia 异步编程与协程

15. 异步编程与协程

Julia 的 Task(协程)机制支持轻量级并发,适合 I/O 密集型任务如网络请求、文件操作等。

15.1 @async 与 Task 基础

创建 Task

# @async 创建并启动一个 Task
t = @async begin
    println("Task 开始")
    sleep(1)
    println("Task 结束")
    return 42
end

println("主任务继续执行")
result = fetch(t)  # 等待并获取结果
println("Task 返回: $result")

手动创建 Task

# 更底层的方式
t = Task() do
    sleep(0.5)
    return "完成"
end

schedule(t)  # 启动 Task
println(fetch(t))  # 等待结果

多个并发 Task

# 同时启动多个任务
tasks = [@async begin
    sleep(rand())
    return "Task $i 完成"
end for i in 1:5]

# 等待所有结果
results = fetch.(tasks)
for r in results
    println(r)
end

15.2 yield、sleep 与 wait

yield:让出执行权

# yield 允许其他 Task 运行
function counting_task(name, n)
    for i in 1:n
        println("$name: $i")
        yield()  # 让出执行权
    end
end

@async counting_task("A", 5)
@async counting_task("B", 5)

# 两个任务交替执行
sleep(2)

sleep:休眠

@async begin
    println("开始")
    sleep(2)  # 休眠2秒(不阻塞其他Task)
    println("2秒后")
end

println("主任务不受影响")
sleep(3)

wait:等待条件

# 使用 Condition 进行同步
cond = Condition()

@async begin
    println("等待条件...")
    wait(cond)
    println("条件满足!")
end

sleep(1)
notify(cond)  # 唤醒等待者

15.3 Channel 通信

基本 Channel

# 带缓冲的 Channel
ch = Channel{String}(10)

# 生产者
@async for i in 1:5
    msg = "消息 #$i"
    put!(ch, msg)
    println("发送: $msg")
    sleep(0.2)
end

# 消费者
for i in 1:5
    msg = take!(ch)
    println("接收: $msg")
end

无限生产者-消费者

# 生产者:无限生成数据
function producer(ch::Channel)
    i = 0
    while true
        i += 1
        put!(ch, rand()^2)
        sleep(0.1)
    end
end

ch = Channel(producer; ctype=Float64)

# 消费者:处理有限数量
values = [take!(ch) for _ in 1:10]
println("收集的值: ", round.(values, digits=3))

Channel 关闭

ch = Channel{Int}(5)

@async begin
    for i in 1:5
        put!(ch, i)
    end
    close(ch)  # 生产完毕,关闭通道
end

# 可以用 for 循环迭代已关闭的 channel
for val in ch
    println("收到: $val")
end
println("Channel 已关闭")

15.4 异步 I/O

异步文件读取

# 异步写入
@async begin
    open("/tmp/test_async.txt", "w") do f
        for i in 1:5
            write(f, "行 $i\n")
            sleep(0.1)
        end
    end
    println("写入完成")
end

# 主任务继续
println("不阻塞")
sleep(1)

# 异步读取
@async begin
    content = read("/tmp/test_async.txt", String)
    println("读取内容:\n$content")
end

sleep(1)

异步 HTTP 请求(使用 HTTP.jl)

# 需要 HTTP.jl 包
# using HTTP

# 并发 HTTP 请求
urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent"
]

# 串行请求(慢)
# for url in urls
#     resp = HTTP.get(url)
#     println(resp.status)
# end

# 并发请求(快)
# tasks = [@async HTTP.get(url) for url in urls]
# responses = fetch.(tasks)

💡 I/O 密集型:对于网络请求、文件操作等 I/O 等待的场景,@async 的优势最大。

15.5 Task 调度

任务调度原理

# Julia 使用协作式调度
# Task 在遇到 yield/sleep/wait/IO 时主动让出

# 查看当前 Task
println(current_task())

# 任务状态
t = @async sleep(10)
println(istaskstarted(t))   # true
println(istaskdone(t))      # false
sleep(1)
println(istaskdone(t))      # false (还在sleep)

# 强制等待(不推荐,可能死锁)
# wait(t)

任务优先级

# Julia 目前没有显式的任务优先级API
# 但可以通过控制 yield 时机来影响调度

function high_priority_work()
    @async for i in 1:5
        println("高优先级: $i")
        yield()
    end
end

function low_priority_work()
    @async for i in 1:5
        println("低优先级: $i")
        sleep(0.1)  # 通过 sleep 降低执行频率
    end
end

high_priority_work()
low_priority_work()
sleep(1)

15.6 异步异常处理

Task 中的异常

# Task 中的异常不会自动传播到主任务
t = @async begin
    error("Task 中出错!")
end

# fetch 时才会抛出异常
try
    fetch(t)
catch e
    println("捕获异常: ", e)
end

errormonitor

# 自动监控并报告 Task 异常
t = errormonitor(@async begin
    sleep(0.5)
    error("出错了!")
end)

# errormonitor 会在 task 失败时自动打印警告
sleep(1)

trywait / task-local 异常处理

function safe_async(f)
    return @async try
        f()
    catch e
        println("异步错误: ", e)
        rethrow()
    end
end

t = safe_async() do
    error("测试错误")
end

try
    fetch(t)
catch
    println("已处理")
end

15.7 asyncmap

# asyncmap:并发映射(I/O密集型适用)
urls = repeat(["https://httpbin.org/delay/1"], 5)

# 串行(5秒)
# results = map(url -> HTTP.get(url).status, urls)

# 异步并发(约1秒)
# results = asyncmap(url -> HTTP.get(url).status, urls; ntasks=5)

# 指定并发数
# results = asyncmap(process_item, items; ntasks=10)

自定义 asyncmap

function my_asyncmap(f, items; ntasks=10)
    results = Vector{Any}(undef, length(items))
    semaphore = Channel{Bool}(ntasks)
    
    tasks = map(enumerate(items)) do (i, item)
        @async begin
            put!(semaphore, true)  # 获取信号量
            try
                results[i] = f(item)
            finally
                take!(semaphore)  # 释放信号量
            end
        end
    end
    
    fetch.(tasks)
    return results
end

# 使用
results = my_asyncmap(1:20; ntasks=5) do x
    sleep(0.2)
    return x^2
end

println(results)

15.8 异步与并行对比

特性@async (协程)@spawn (进程)Threads.@threads
内存模型共享独立共享
启动开销极低
适用场景I/O 密集CPU 密集CPU 密集
真正并行❌(单线程)
代码复杂度
# I/O 密集型 → @async
tasks = [@async download(url) for url in urls]

# CPU 密集型 → @distributed 或 Threads
results = pmap(heavy_computation, data)

# 细粒度并行 → Threads
Threads.@threads for i in 1:n
    results[i] = compute(i)
end

15.9 生产者-消费者模式

完整示例

# 数据处理管道
function pipeline()
    # Stage 1: 数据生成
    raw_ch = Channel(32)
    @async begin
        for i in 1:100
            put!(raw_ch, randn())
            sleep(0.01)  # 模拟数据到达
        end
        close(raw_ch)
    end

    # Stage 2: 数据过滤
    filtered_ch = Channel(32)
    @async begin
        for val in raw_ch
            if abs(val) < 2.0
                put!(filtered_ch, val)
            end
        end
        close(filtered_ch)
    end

    # Stage 3: 数据聚合
    results = Float64[]
    window = Float64[]
    for val in filtered_ch
        push!(window, val)
        if length(window) >= 10
            push!(results, sum(window) / length(window))
            empty!(window)
        end
    end

    return results
end

averages = pipeline()
println("收集了 $(length(averages)) 个平均值")

多消费者

function multi_consumer()
    ch = Channel(10)
    
    # 生产者
    @async begin
        for i in 1:30
            put!(ch, i)
        end
        close(ch)
    end

    # 多个消费者
    consumer(n) = @async begin
        total = 0
        for val in ch
            total += val
            sleep(0.01)  # 模拟处理
        end
        println("消费者 $n 总和: $total")
    end

    tasks = [consumer(i) for i in 1:3]
    fetch.(tasks)
end

multi_consumer()

15.10 HTTP 异步请求实战

并发爬虫模式

using Dates

# 模拟 HTTP 请求
function fake_request(url)
    sleep(rand() * 0.5)  # 模拟网络延迟
    return (url=url, status=200, time=now())
end

function concurrent_requests(urls; max_concurrent=5)
    results = Channel(length(urls))
    semaphore = Channel{Bool}(max_concurrent)
    
    tasks = map(urls) do url
        @async begin
            put!(semaphore, true)
            try
                result = fake_request(url)
                put!(results, result)
            finally
                take!(semaphore)
            end
        end
    end
    
    fetch.(tasks)
    close(results)
    
    return collect(results)
end

urls = ["https://example.com/page/$i" for i in 1:20]
results = concurrent_requests(urls; max_concurrent=5)
println("完成 $(length(results)) 个请求")

15.11 实际业务场景

场景一:日志异步收集

function async_logger()
    log_ch = Channel{String}(1000)
    
    # 后台日志写入
    @async begin
        open("/tmp/app.log", "w") do f
            for msg in log_ch
                write(f, "[$(now())] $msg\n")
            end
        end
    end
    
    # 返回日志函数
    return function(msg)
        put!(log_ch, msg)
    end
end

log = async_logger()
for i in 1:100
    log("事件 #$i")
end

场景二:任务超时控制

function with_timeout(f, timeout_sec)
    t = @async f()
    timer = Timer(timeout_sec)
    
    @async begin
        wait(timer)
        if !istaskdone(t)
            schedule(t, ErrorException("超时"); error=true)
        end
    end
    
    return fetch(t)
end

# 使用
try
    result = with_timeout(3.0) do
        sleep(10)  # 模拟超长任务
        return 42
    end
catch e
    println("超时: ", e)
end

场景三:并发 Web 服务器骨架

function simple_server()
    # 模拟处理请求
    function handle_request(req_id)
        sleep(rand() * 0.1)  # 模拟处理
        return "响应 #$req_id"
    end
    
    # 请求队列
    request_ch = Channel(100)
    
    # 工作线程
    for _ in 1:4
        @async for req_id in request_ch
            response = handle_request(req_id)
            println(response)
        end
    end
    
    # 模拟请求到达
    for i in 1:20
        put!(request_ch, i)
    end
    
    return request_ch
end

ch = simple_server()
sleep(2)

15.12 扩展阅读

资源链接
Julia 官方文档 - Asynchronous Programminghttps://docs.julialang.org/en/v1/manual/asynchronous-programming/
Julia 官方文档 - Channelshttps://docs.julialang.org/en/v1/base/parallel/#Base.Channel
HTTP.jlhttps://github.com/JuliaWeb/HTTP.jl
ConcurrentUtilities.jlhttps://github.com/JuliaComputing/ConcurrentUtilities.jl

15.13 本章小结

主题要点
@async创建协程,单线程并发
yield/sleep主动让出执行权
ChannelTask 间通信管道
异步 I/O不阻塞主任务
asyncmap并发映射(I/O密集型)
错误处理fetch 传播异常,errormonitor 监控
生产者-消费者Channel 实现管道模式