异步与协程精讲 / 第8章:Python asyncio —— 从生成器到协程
第8章:Python asyncio —— 从生成器到协程
8.1 Python 异步的演进史
Python 的异步编程经历了三个阶段:
时间线:
2001 ── Python 2.2 引入生成器(yield)
2005 ── PEP 342: 增强生成器(send/throw)
2006 ── Twisted 框架成熟(回调风格)
2010 ── Tornado 框架(协程 + 回调混合)
2012 ── PEP 3156: asyncio 提案(tulip 项目)
2014 ── Python 3.4: asyncio 标准库(@asyncio.coroutine + yield from)
2015 ── Python 3.5: async/await 关键字(PEP 492)
2018 ── Python 3.7: asyncio.run(),简化入口
2020 ── Python 3.9: asyncio.to_thread()
2021 ── Python 3.10: 任务组(TaskGroup)
2023 ── Python 3.12: 改进的 Task 生命周期管理
三个时代的对比
| 时代 | 版本 | 风格 | 示例 |
|---|
| 生成器协程 | 2.2+ | yield | def gen(): yield 1 |
| 原生协程 | 3.4 | @asyncio.coroutine | @coroutine\ndef f(): yield from ... |
| async/await | 3.5+ | async/await | async def f(): await ... |
8.2 事件循环基础
启动事件循环
import asyncio
async def main():
print('Hello')
await asyncio.sleep(1)
print('World')
# Python 3.7+ — 推荐方式
asyncio.run(main())
# 手动管理(更灵活)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
事件循环的生命周期
asyncio.run(main())
│
├── 创建新的事件循环
├── 创建 main() 的 Task
├── 运行事件循环直到 main() 完成
├── 关闭所有异步生成器
├── 关闭事件循环
└── 返回 main() 的结果
asyncio.run() vs loop.run_until_complete()
| 特性 | asyncio.run() | loop.run_until_complete() |
|---|
| 创建循环 | 自动 | 需手动创建 |
| 清理 | 自动清理异步生成器 | 不清理 |
| 嵌套 | 不允许 | 允许 |
| 推荐场景 | 程序入口 | 库内部使用 |
8.3 Task 与 Future
Future — 结果的占位符
async def demo_future():
loop = asyncio.get_running_loop()
future = loop.create_future()
# 模拟异步操作设置结果
async def set_result():
await asyncio.sleep(1)
future.set_result(42)
asyncio.create_task(set_result())
# 等待结果
result = await future
print(f"结果: {result}") # 42
Task — 包装协程
async def fetch_data(url: str) -> dict:
await asyncio.sleep(1) # 模拟 I/O
return {"url": url, "status": "ok"}
async def main():
# 创建任务(立即开始执行)
task = asyncio.create_task(fetch_data("https://api.example.com"))
# 任务状态
print(task.done()) # False
# 等待结果
result = await task
print(task.done()) # True
print(result) # {"url": "...", "status": "ok"}
Task vs Future 对比
| 特性 | Future | Task |
|---|
| 角色 | 结果占位符 | 协程的包装器 |
| 创建方式 | loop.create_future() | asyncio.create_task(coro) |
| 设置结果 | 手动 set_result() | 协程返回值自动设置 |
| 可等待 | ✅ | ✅ |
| 使用场景 | 底层 API | 大多数场景 |
8.4 并发执行
asyncio.gather — 收集结果
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User_{user_id}"}
async def main():
# 并发执行,等待所有完成
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
print(users)
# [{"id": 1, "name": "User_1"}, {"id": 2, "name": "User_2"}, ...]
asyncio.gather 的 return_exceptions 参数
async def main():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # 假设这个会失败
fetch_user(3),
return_exceptions=True, # 异常作为结果返回,不抛出
)
for r in results:
if isinstance(r, Exception):
print(f"错误: {r}")
else:
print(f"成功: {r}")
asyncio.wait — 更灵活的等待
async def main():
tasks = [
asyncio.create_task(fetch_user(1)),
asyncio.create_task(fetch_user(2)),
asyncio.create_task(fetch_user(3)),
]
# 等待第一个完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(f"第一个完成: {task.result()}")
# 等待所有完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
# 出现第一个异常就返回
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
等待策略对比
| 函数 | 返回类型 | 特点 |
|---|
gather() | 结果列表 | 简单易用,结果有序 |
wait() | (done, pending) 集合 | 灵活,支持多种等待策略 |
as_completed() | 迭代器 | 按完成顺序返回 |
asyncio.as_completed — 按完成顺序处理
async def main():
tasks = [fetch_user(i) for i in range(10)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}") # 谁先完成就先处理谁
8.5 同步原语
Lock — 互斥锁
async def safe_increment(counter: dict, key: str, lock: asyncio.Lock):
async with lock:
current = counter.get(key, 0)
await asyncio.sleep(0.01) # 模拟异步操作
counter[key] = current + 1
Semaphore — 信号量(并发限制)
async def fetch_with_limit(sem: asyncio.Semaphore, url: str):
async with sem: # 限制并发数
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def main():
sem = asyncio.Semaphore(5) # 最多同时 5 个请求
urls = [f"https://api.example.com/{i}" for i in range(100)]
tasks = [fetch_with_limit(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
Event — 事件通知
async def waiter(event: asyncio.Event):
print("等待事件...")
await event.wait()
print("事件触发!")
async def setter(event: asyncio.Event):
await asyncio.sleep(2)
event.set() # 触发事件
async def main():
event = asyncio.Event()
await asyncio.gather(waiter(event), setter(event))
Queue — 异步队列
async def producer(queue: asyncio.Queue, n: int):
for i in range(n):
await asyncio.sleep(0.5)
await queue.put(i)
print(f"生产: {i}")
await queue.put(None) # 毒丸(Poison Pill),通知消费者停止
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # 通知其他消费者
break
print(f"[{name}] 消费: {item}")
await asyncio.sleep(1) # 模拟处理
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
producers = [asyncio.create_task(producer(queue, 20))]
consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3)]
await asyncio.gather(*producers)
await asyncio.gather(*consumers)
8.6 aiohttp — 异步 HTTP 客户端
import aiohttp
import asyncio
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(fetch_one(session, url))
return await asyncio.gather(*tasks)
async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
return {
"url": url,
"status": response.status,
"data": await response.json(),
}
# 使用
urls = [
"https://api.github.com/users/python",
"https://api.github.com/users/golang",
"https://api.github.com/users/rust-lang",
]
results = asyncio.run(fetch_all(urls))
8.7 TaskGroup — Python 3.11+
async def main():
results = []
errors = []
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2))
task3 = tg.create_task(fetch_user(3))
# TaskGroup 结束时,所有任务都已完成(或有异常)
# 如果任何一个任务抛出异常,会收集所有异常并抛出 ExceptionGroup
print(task1.result())
print(task2.result())
print(task3.result())
TaskGroup vs gather
| 特性 | gather() | TaskGroup |
|---|
| 取消 | 不自动取消其他任务 | 一个失败自动取消所有 |
| 错误收集 | 单个异常或 return_exceptions | ExceptionGroup |
| 动态添加 | 不方便 | tg.create_task() 动态添加 |
| 推荐场景 | 简单并发 | 结构化并发 |
8.8 运行阻塞代码
异步代码中不能直接调用阻塞函数(如 time.sleep()、requests.get())。
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def main():
loop = asyncio.get_running_loop()
# 方式一:在默认线程池中运行
result = await loop.run_in_executor(
None, # 使用默认线程池
blocking_function,
arg1, arg2
)
# 方式二:使用自定义线程池
with ThreadPoolExecutor(max_workers=10) as pool:
result = await loop.run_in_executor(pool, blocking_io)
# 方式三:Python 3.9+ 的便捷函数
result = await asyncio.to_thread(blocking_function, arg1, arg2)
8.9 常见陷阱
陷阱一:忘记 await
async def buggy():
# ❌ 创建了 Task 但没有 await,协程可能不会执行
asyncio.create_task(some_coroutine())
async def correct():
# ✅ 保存引用并 await
task = asyncio.create_task(some_coroutine())
await task
陷阱二:在 async 函数中调用阻塞函数
async def buggy():
# ❌ 阻塞整个事件循环!
time.sleep(5)
requests.get("https://api.example.com")
async def correct():
# ✅ 使用异步版本
await asyncio.sleep(5)
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as resp:
return await resp.json()
陷阱三:未处理的异常导致 Task 泄漏
async def buggy():
# ❌ 异常被静默忽略
asyncio.create_task(may_fail())
async def correct():
# ✅ 添加异常回调
task = asyncio.create_task(may_fail())
task.add_done_callback(lambda t: t.result() if not t.cancelled() else None)
8.10 业务场景:异步数据采集
import asyncio
import aiohttp
from dataclasses import dataclass
@dataclass
class FetchResult:
url: str
status: int
data: dict | None
error: str | None
async def fetch_one(session: aiohttp.ClientSession, url: str, sem: asyncio.Semaphore) -> FetchResult:
async with sem:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json()
return FetchResult(url=url, status=resp.status, data=data, error=None)
except Exception as e:
return FetchResult(url=url, status=0, data=None, error=str(e))
async def bulk_fetch(urls: list[str], concurrency: int = 10) -> list[FetchResult]:
sem = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url, sem) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [f"https://api.example.com/items/{i}" for i in range(1000)]
results = await bulk_fetch(urls, concurrency=20)
success = [r for r in results if r.error is None]
failed = [r for r in results if r.error is not None]
print(f"成功: {len(success)}, 失败: {len(failed)}")
asyncio.run(main())
8.11 本章小结
| 要点 | 说明 |
|---|
| 事件循环 | asyncio 的核心调度器 |
| Task | 协程的包装器,立即开始执行 |
| Future | 结果的占位符,底层抽象 |
| gather/wait/as_completed | 三种并发执行模式 |
| 同步原语 | Lock、Semaphore、Event、Queue |
| TaskGroup | Python 3.11+ 的结构化并发 |
| 阻塞代码 | 使用 to_thread() 或 run_in_executor() |
下一章预告:Rust 的 async 如何通过零成本抽象实现高性能异步编程?
扩展阅读