Julia 教程 / Julia 异步编程与协程
15. 异步编程与协程
Julia 的 Task(协程)机制支持轻量级并发,适合 I/O 密集型任务如网络请求、文件操作等。
15.1 @async 与 Task 基础
创建 Task
# @async 创建并启动一个 Task
t = @async begin
println("Task 开始")
sleep(1)
println("Task 结束")
return 42
end
println("主任务继续执行")
result = fetch(t) # 等待并获取结果
println("Task 返回: $result")
手动创建 Task
# 更底层的方式
t = Task() do
sleep(0.5)
return "完成"
end
schedule(t) # 启动 Task
println(fetch(t)) # 等待结果
多个并发 Task
# 同时启动多个任务
tasks = [@async begin
sleep(rand())
return "Task $i 完成"
end for i in 1:5]
# 等待所有结果
results = fetch.(tasks)
for r in results
println(r)
end
15.2 yield、sleep 与 wait
yield:让出执行权
# yield 允许其他 Task 运行
function counting_task(name, n)
for i in 1:n
println("$name: $i")
yield() # 让出执行权
end
end
@async counting_task("A", 5)
@async counting_task("B", 5)
# 两个任务交替执行
sleep(2)
sleep:休眠
@async begin
println("开始")
sleep(2) # 休眠2秒(不阻塞其他Task)
println("2秒后")
end
println("主任务不受影响")
sleep(3)
wait:等待条件
# 使用 Condition 进行同步
cond = Condition()
@async begin
println("等待条件...")
wait(cond)
println("条件满足!")
end
sleep(1)
notify(cond) # 唤醒等待者
15.3 Channel 通信
基本 Channel
# 带缓冲的 Channel
ch = Channel{String}(10)
# 生产者
@async for i in 1:5
msg = "消息 #$i"
put!(ch, msg)
println("发送: $msg")
sleep(0.2)
end
# 消费者
for i in 1:5
msg = take!(ch)
println("接收: $msg")
end
无限生产者-消费者
# 生产者:无限生成数据
function producer(ch::Channel)
i = 0
while true
i += 1
put!(ch, rand()^2)
sleep(0.1)
end
end
ch = Channel(producer; ctype=Float64)
# 消费者:处理有限数量
values = [take!(ch) for _ in 1:10]
println("收集的值: ", round.(values, digits=3))
Channel 关闭
ch = Channel{Int}(5)
@async begin
for i in 1:5
put!(ch, i)
end
close(ch) # 生产完毕,关闭通道
end
# 可以用 for 循环迭代已关闭的 channel
for val in ch
println("收到: $val")
end
println("Channel 已关闭")
15.4 异步 I/O
异步文件读取
# 异步写入
@async begin
open("/tmp/test_async.txt", "w") do f
for i in 1:5
write(f, "行 $i\n")
sleep(0.1)
end
end
println("写入完成")
end
# 主任务继续
println("不阻塞")
sleep(1)
# 异步读取
@async begin
content = read("/tmp/test_async.txt", String)
println("读取内容:\n$content")
end
sleep(1)
异步 HTTP 请求(使用 HTTP.jl)
# 需要 HTTP.jl 包
# using HTTP
# 并发 HTTP 请求
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent"
]
# 串行请求(慢)
# for url in urls
# resp = HTTP.get(url)
# println(resp.status)
# end
# 并发请求(快)
# tasks = [@async HTTP.get(url) for url in urls]
# responses = fetch.(tasks)
💡 I/O 密集型:对于网络请求、文件操作等 I/O 等待的场景,@async 的优势最大。
15.5 Task 调度
任务调度原理
# Julia 使用协作式调度
# Task 在遇到 yield/sleep/wait/IO 时主动让出
# 查看当前 Task
println(current_task())
# 任务状态
t = @async sleep(10)
println(istaskstarted(t)) # true
println(istaskdone(t)) # false
sleep(1)
println(istaskdone(t)) # false (还在sleep)
# 强制等待(不推荐,可能死锁)
# wait(t)
任务优先级
# Julia 目前没有显式的任务优先级API
# 但可以通过控制 yield 时机来影响调度
function high_priority_work()
@async for i in 1:5
println("高优先级: $i")
yield()
end
end
function low_priority_work()
@async for i in 1:5
println("低优先级: $i")
sleep(0.1) # 通过 sleep 降低执行频率
end
end
high_priority_work()
low_priority_work()
sleep(1)
15.6 异步异常处理
Task 中的异常
# Task 中的异常不会自动传播到主任务
t = @async begin
error("Task 中出错!")
end
# fetch 时才会抛出异常
try
fetch(t)
catch e
println("捕获异常: ", e)
end
errormonitor
# 自动监控并报告 Task 异常
t = errormonitor(@async begin
sleep(0.5)
error("出错了!")
end)
# errormonitor 会在 task 失败时自动打印警告
sleep(1)
trywait / task-local 异常处理
function safe_async(f)
return @async try
f()
catch e
println("异步错误: ", e)
rethrow()
end
end
t = safe_async() do
error("测试错误")
end
try
fetch(t)
catch
println("已处理")
end
15.7 asyncmap
# asyncmap:并发映射(I/O密集型适用)
urls = repeat(["https://httpbin.org/delay/1"], 5)
# 串行(5秒)
# results = map(url -> HTTP.get(url).status, urls)
# 异步并发(约1秒)
# results = asyncmap(url -> HTTP.get(url).status, urls; ntasks=5)
# 指定并发数
# results = asyncmap(process_item, items; ntasks=10)
自定义 asyncmap
function my_asyncmap(f, items; ntasks=10)
results = Vector{Any}(undef, length(items))
semaphore = Channel{Bool}(ntasks)
tasks = map(enumerate(items)) do (i, item)
@async begin
put!(semaphore, true) # 获取信号量
try
results[i] = f(item)
finally
take!(semaphore) # 释放信号量
end
end
end
fetch.(tasks)
return results
end
# 使用
results = my_asyncmap(1:20; ntasks=5) do x
sleep(0.2)
return x^2
end
println(results)
15.8 异步与并行对比
| 特性 | @async (协程) | @spawn (进程) | Threads.@threads |
|---|
| 内存模型 | 共享 | 独立 | 共享 |
| 启动开销 | 极低 | 高 | 低 |
| 适用场景 | I/O 密集 | CPU 密集 | CPU 密集 |
| 真正并行 | ❌(单线程) | ✅ | ✅ |
| 代码复杂度 | 低 | 中 | 低 |
# I/O 密集型 → @async
tasks = [@async download(url) for url in urls]
# CPU 密集型 → @distributed 或 Threads
results = pmap(heavy_computation, data)
# 细粒度并行 → Threads
Threads.@threads for i in 1:n
results[i] = compute(i)
end
15.9 生产者-消费者模式
完整示例
# 数据处理管道
function pipeline()
# Stage 1: 数据生成
raw_ch = Channel(32)
@async begin
for i in 1:100
put!(raw_ch, randn())
sleep(0.01) # 模拟数据到达
end
close(raw_ch)
end
# Stage 2: 数据过滤
filtered_ch = Channel(32)
@async begin
for val in raw_ch
if abs(val) < 2.0
put!(filtered_ch, val)
end
end
close(filtered_ch)
end
# Stage 3: 数据聚合
results = Float64[]
window = Float64[]
for val in filtered_ch
push!(window, val)
if length(window) >= 10
push!(results, sum(window) / length(window))
empty!(window)
end
end
return results
end
averages = pipeline()
println("收集了 $(length(averages)) 个平均值")
多消费者
function multi_consumer()
ch = Channel(10)
# 生产者
@async begin
for i in 1:30
put!(ch, i)
end
close(ch)
end
# 多个消费者
consumer(n) = @async begin
total = 0
for val in ch
total += val
sleep(0.01) # 模拟处理
end
println("消费者 $n 总和: $total")
end
tasks = [consumer(i) for i in 1:3]
fetch.(tasks)
end
multi_consumer()
15.10 HTTP 异步请求实战
并发爬虫模式
using Dates
# 模拟 HTTP 请求
function fake_request(url)
sleep(rand() * 0.5) # 模拟网络延迟
return (url=url, status=200, time=now())
end
function concurrent_requests(urls; max_concurrent=5)
results = Channel(length(urls))
semaphore = Channel{Bool}(max_concurrent)
tasks = map(urls) do url
@async begin
put!(semaphore, true)
try
result = fake_request(url)
put!(results, result)
finally
take!(semaphore)
end
end
end
fetch.(tasks)
close(results)
return collect(results)
end
urls = ["https://example.com/page/$i" for i in 1:20]
results = concurrent_requests(urls; max_concurrent=5)
println("完成 $(length(results)) 个请求")
15.11 实际业务场景
场景一:日志异步收集
function async_logger()
log_ch = Channel{String}(1000)
# 后台日志写入
@async begin
open("/tmp/app.log", "w") do f
for msg in log_ch
write(f, "[$(now())] $msg\n")
end
end
end
# 返回日志函数
return function(msg)
put!(log_ch, msg)
end
end
log = async_logger()
for i in 1:100
log("事件 #$i")
end
场景二:任务超时控制
function with_timeout(f, timeout_sec)
t = @async f()
timer = Timer(timeout_sec)
@async begin
wait(timer)
if !istaskdone(t)
schedule(t, ErrorException("超时"); error=true)
end
end
return fetch(t)
end
# 使用
try
result = with_timeout(3.0) do
sleep(10) # 模拟超长任务
return 42
end
catch e
println("超时: ", e)
end
场景三:并发 Web 服务器骨架
function simple_server()
# 模拟处理请求
function handle_request(req_id)
sleep(rand() * 0.1) # 模拟处理
return "响应 #$req_id"
end
# 请求队列
request_ch = Channel(100)
# 工作线程
for _ in 1:4
@async for req_id in request_ch
response = handle_request(req_id)
println(response)
end
end
# 模拟请求到达
for i in 1:20
put!(request_ch, i)
end
return request_ch
end
ch = simple_server()
sleep(2)
15.12 扩展阅读
15.13 本章小结
| 主题 | 要点 |
|---|
| @async | 创建协程,单线程并发 |
| yield/sleep | 主动让出执行权 |
| Channel | Task 间通信管道 |
| 异步 I/O | 不阻塞主任务 |
| asyncmap | 并发映射(I/O密集型) |
| 错误处理 | fetch 传播异常,errormonitor 监控 |
| 生产者-消费者 | Channel 实现管道模式 |