14 - 异步编程
第 14 章:异步编程
掌握 Python 的 async/await、协程、异步 I/O 和结构化并发。
14.1 异步编程基础
14.1.1 为什么需要异步?
同步执行:
任务A: ████░░░░ (等待I/O)
任务B: ░░░░████ (等A完成才开始)
异步执行:
任务A: ██░░██ (等待I/O时切换)
任务B: ░░██░░ (利用A的I/O等待时间)
异步编程适合 I/O 密集型任务(网络请求、数据库查询、文件操作)。
14.1.2 协程(Coroutine)
import asyncio
async def say_hello(name: str, delay: float):
"""异步协程。"""
print(f"Hello {name} 开始")
await asyncio.sleep(delay) # 异步等待
print(f"Hello {name} 完成")
# 运行协程
asyncio.run(say_hello("World", 1))
14.2 async / await
14.2.1 基本语法
import asyncio
async def fetch_data(url: str) -> dict:
"""模拟异步数据获取。"""
print(f"开始获取 {url}")
await asyncio.sleep(1) # 模拟 I/O
return {"url": url, "data": "response"}
async def main():
# 顺序执行
result1 = await fetch_data("http://api.example.com/a")
result2 = await fetch_data("http://api.example.com/b")
asyncio.run(main())
14.2.2 并发执行
import asyncio
async def fetch_data(url: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Response from {url}"
async def main():
# 方式一:gather(收集所有结果)
results = await asyncio.gather(
fetch_data("url1", 2),
fetch_data("url2", 1),
fetch_data("url3", 3),
)
print(results)
# 方式二:TaskGroup(Python 3.11+,结构化并发)
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("url1", 2))
task2 = tg.create_task(fetch_data("url2", 1))
print(task1.result(), task2.result())
asyncio.run(main())
14.3 asyncio 核心概念
14.3.1 事件循环
import asyncio
async def main():
print("开始")
await asyncio.sleep(1)
print("结束")
# Python 3.7+ 推荐方式
asyncio.run(main())
# 手动管理事件循环(高级用法)
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
14.3.2 Task
import asyncio
async def compute(n: int) -> int:
await asyncio.sleep(0.1)
return n * n
async def main():
# 创建任务
task = asyncio.create_task(compute(5))
# 可以做其他事情
print("任务已创建")
# 等待结果
result = await task
print(f"结果: {result}") # 25
asyncio.run(main())
14.3.3 等待多个任务
import asyncio
async def task(n: int, delay: float) -> str:
await asyncio.sleep(delay)
return f"Task {n} 完成"
async def main():
tasks = [task(i, i * 0.5) for i in range(5)]
# 等待所有完成
results = await asyncio.gather(*tasks)
print(results)
# 等待第一个完成
done, pending = await asyncio.wait(
[asyncio.create_task(t) for t in [task(1, 2), task(2, 1)]],
return_when=asyncio.FIRST_COMPLETED,
)
for d in done:
print(f"首先完成: {d.result()}")
asyncio.run(main())
14.3.4 超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("操作超时!")
# Python 3.11+ timeout
async with asyncio.timeout(2.0):
try:
result = await slow_operation()
except asyncio.TimeoutError:
print("超时")
asyncio.run(main())
14.4 异步上下文管理器
import asyncio
class AsyncDatabaseConnection:
def __init__(self, dsn: str):
self.dsn = dsn
async def __aenter__(self):
print(f"连接数据库: {self.dsn}")
await asyncio.sleep(0.1) # 模拟连接
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接")
await asyncio.sleep(0.05) # 模拟关闭
async def query(self, sql: str) -> list:
await asyncio.sleep(0.1) # 模拟查询
return [{"id": 1, "name": "Alice"}]
async def main():
async with AsyncDatabaseConnection("postgresql://localhost/db") as conn:
result = await conn.query("SELECT * FROM users")
print(result)
asyncio.run(main())
14.5 异步迭代器
import asyncio
class AsyncRange:
def __init__(self, start: int, stop: int):
self.start = start
self.stop = stop
def __aiter__(self):
self.current = self.start
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步操作
self.current += 1
return self.current - 1
async def main():
async for num in AsyncRange(0, 5):
print(num)
asyncio.run(main())
14.6 异步 HTTP 客户端
14.6.1 aiohttp
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
# 并发请求
tasks = [
fetch(session, f"https://jsonplaceholder.typicode.com/posts/{i}")
for i in range(1, 6)
]
results = await asyncio.gather(*tasks)
for r in results:
print(r["title"][:50])
asyncio.run(main())
14.6.2 httpx(推荐)
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url)
return response.json()
async def main():
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [
fetch(client, f"https://jsonplaceholder.typicode.com/posts/{i}")
for i in range(1, 6)
]
results = await asyncio.gather(*tasks)
print(len(results))
asyncio.run(main())
14.7 异步数据库
import asyncio
import aiosqlite
async def main():
async with aiosqlite.connect("test.db") as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
)
""")
await db.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
await db.commit()
async with db.execute("SELECT * FROM users") as cursor:
async for row in cursor:
print(row)
asyncio.run(main())
14.8 TaskGroup(结构化并发)
import asyncio
async def task(n: int) -> int:
if n == 3:
raise ValueError(f"任务 {n} 失败")
await asyncio.sleep(n * 0.1)
return n
async def main():
# TaskGroup 确保所有任务完成或取消
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(task(i)) for i in range(5)]
except* ValueError as eg: # Python 3.11+ 异常组
for exc in eg.exceptions:
print(f"捕获: {exc}")
# 所有任务要么完成,要么被取消
for t in tasks:
if t.done() and not t.cancelled():
print(f"成功: {t.result()}")
asyncio.run(main())
14.9 常见陷阱
# ❌ 错误:在异步函数中使用同步阻塞调用
async def bad():
import time
time.sleep(5) # 阻塞整个事件循环!
# ✅ 正确:使用异步等价物
async def good():
await asyncio.sleep(5)
# ❌ 错误:忘记 await
async def bad():
coroutine = asyncio.sleep(1) # 协程未执行!
# 应该是: await asyncio.sleep(1)
# ✅ 正确:使用 to_thread 执行同步代码
async def process():
result = await asyncio.to_thread(sync_heavy_function)
14.10 注意事项
🔴 注意:
- 不要在异步函数中使用阻塞调用(
time.sleep、requests.get) await只能在async def函数中使用asyncio.run()每个线程只能调用一次gather()的任务是并发执行,不是并行
💡 提示:
- 使用
httpx替代requests的同步阻塞调用 - 使用
asyncio.to_thread()包装同步代码 - Python 3.11+ 使用
TaskGroup替代gather(支持异常处理) - 使用
asyncio.timeout()控制超时
📌 业务场景:
import asyncio
import httpx
async def crawl(urls: list[str]) -> list[dict]:
"""异步爬取多个网页。"""
async with httpx.AsyncClient(timeout=30.0) as client:
sem = asyncio.Semaphore(10) # 限制并发数
async def fetch(url: str) -> dict:
async with sem:
resp = await client.get(url)
return {"url": url, "status": resp.status_code, "size": len(resp.text)}
tasks = [fetch(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
urls = [f"https://example.com/page/{i}" for i in range(100)]
results = asyncio.run(crawl(urls))
print(f"完成 {len(results)} 个请求")
14.11 扩展阅读
- asyncio 模块文档
- PEP 492 - Coroutines with async/await
- aiohttp 文档
- httpx 文档
- 《流畅的 Python》第 19-21 章