强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

14 - 异步编程

第 14 章:异步编程

掌握 Python 的 async/await、协程、异步 I/O 和结构化并发。


14.1 异步编程基础

14.1.1 为什么需要异步?

同步执行:
任务A: ████░░░░ (等待I/O)
任务B: ░░░░████ (等A完成才开始)

异步执行:
任务A: ██░░██ (等待I/O时切换)
任务B: ░░██░░ (利用A的I/O等待时间)

异步编程适合 I/O 密集型任务(网络请求、数据库查询、文件操作)。

14.1.2 协程(Coroutine)

import asyncio

async def say_hello(name: str, delay: float):
    """异步协程。"""
    print(f"Hello {name} 开始")
    await asyncio.sleep(delay)  # 异步等待
    print(f"Hello {name} 完成")

# 运行协程
asyncio.run(say_hello("World", 1))

14.2 async / await

14.2.1 基本语法

import asyncio

async def fetch_data(url: str) -> dict:
    """模拟异步数据获取。"""
    print(f"开始获取 {url}")
    await asyncio.sleep(1)  # 模拟 I/O
    return {"url": url, "data": "response"}

async def main():
    # 顺序执行
    result1 = await fetch_data("http://api.example.com/a")
    result2 = await fetch_data("http://api.example.com/b")

asyncio.run(main())

14.2.2 并发执行

import asyncio

async def fetch_data(url: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Response from {url}"

async def main():
    # 方式一:gather(收集所有结果)
    results = await asyncio.gather(
        fetch_data("url1", 2),
        fetch_data("url2", 1),
        fetch_data("url3", 3),
    )
    print(results)

    # 方式二:TaskGroup(Python 3.11+,结构化并发)
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("url1", 2))
        task2 = tg.create_task(fetch_data("url2", 1))
    print(task1.result(), task2.result())

asyncio.run(main())

14.3 asyncio 核心概念

14.3.1 事件循环

import asyncio

async def main():
    print("开始")
    await asyncio.sleep(1)
    print("结束")

# Python 3.7+ 推荐方式
asyncio.run(main())

# 手动管理事件循环(高级用法)
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()

14.3.2 Task

import asyncio

async def compute(n: int) -> int:
    await asyncio.sleep(0.1)
    return n * n

async def main():
    # 创建任务
    task = asyncio.create_task(compute(5))
    
    # 可以做其他事情
    print("任务已创建")
    
    # 等待结果
    result = await task
    print(f"结果: {result}")  # 25

asyncio.run(main())

14.3.3 等待多个任务

import asyncio

async def task(n: int, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Task {n} 完成"

async def main():
    tasks = [task(i, i * 0.5) for i in range(5)]
    
    # 等待所有完成
    results = await asyncio.gather(*tasks)
    print(results)
    
    # 等待第一个完成
    done, pending = await asyncio.wait(
        [asyncio.create_task(t) for t in [task(1, 2), task(2, 1)]],
        return_when=asyncio.FIRST_COMPLETED,
    )
    for d in done:
        print(f"首先完成: {d.result()}")

asyncio.run(main())

14.3.4 超时控制

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("操作超时!")
    
    # Python 3.11+ timeout
    async with asyncio.timeout(2.0):
        try:
            result = await slow_operation()
        except asyncio.TimeoutError:
            print("超时")

asyncio.run(main())

14.4 异步上下文管理器

import asyncio

class AsyncDatabaseConnection:
    def __init__(self, dsn: str):
        self.dsn = dsn
    
    async def __aenter__(self):
        print(f"连接数据库: {self.dsn}")
        await asyncio.sleep(0.1)  # 模拟连接
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭数据库连接")
        await asyncio.sleep(0.05)  # 模拟关闭
    
    async def query(self, sql: str) -> list:
        await asyncio.sleep(0.1)  # 模拟查询
        return [{"id": 1, "name": "Alice"}]

async def main():
    async with AsyncDatabaseConnection("postgresql://localhost/db") as conn:
        result = await conn.query("SELECT * FROM users")
        print(result)

asyncio.run(main())

14.5 异步迭代器

import asyncio

class AsyncRange:
    def __init__(self, start: int, stop: int):
        self.start = start
        self.stop = stop
    
    def __aiter__(self):
        self.current = self.start
        return self
    
    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # 模拟异步操作
        self.current += 1
        return self.current - 1

async def main():
    async for num in AsyncRange(0, 5):
        print(num)

asyncio.run(main())

14.6 异步 HTTP 客户端

14.6.1 aiohttp

import asyncio
import aiohttp

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        # 并发请求
        tasks = [
            fetch(session, f"https://jsonplaceholder.typicode.com/posts/{i}")
            for i in range(1, 6)
        ]
        results = await asyncio.gather(*tasks)
        for r in results:
            print(r["title"][:50])

asyncio.run(main())

14.6.2 httpx(推荐)

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url)
    return response.json()

async def main():
    async with httpx.AsyncClient(timeout=10.0) as client:
        tasks = [
            fetch(client, f"https://jsonplaceholder.typicode.com/posts/{i}")
            for i in range(1, 6)
        ]
        results = await asyncio.gather(*tasks)
        print(len(results))

asyncio.run(main())

14.7 异步数据库

import asyncio
import aiosqlite

async def main():
    async with aiosqlite.connect("test.db") as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL
            )
        """)
        await db.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
        await db.commit()
        
        async with db.execute("SELECT * FROM users") as cursor:
            async for row in cursor:
                print(row)

asyncio.run(main())

14.8 TaskGroup(结构化并发)

import asyncio

async def task(n: int) -> int:
    if n == 3:
        raise ValueError(f"任务 {n} 失败")
    await asyncio.sleep(n * 0.1)
    return n

async def main():
    # TaskGroup 确保所有任务完成或取消
    try:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(task(i)) for i in range(5)]
    except* ValueError as eg:  # Python 3.11+ 异常组
        for exc in eg.exceptions:
            print(f"捕获: {exc}")
    
    # 所有任务要么完成,要么被取消
    for t in tasks:
        if t.done() and not t.cancelled():
            print(f"成功: {t.result()}")

asyncio.run(main())

14.9 常见陷阱

# ❌ 错误:在异步函数中使用同步阻塞调用
async def bad():
    import time
    time.sleep(5)  # 阻塞整个事件循环!

# ✅ 正确:使用异步等价物
async def good():
    await asyncio.sleep(5)

# ❌ 错误:忘记 await
async def bad():
    coroutine = asyncio.sleep(1)  # 协程未执行!
    # 应该是: await asyncio.sleep(1)

# ✅ 正确:使用 to_thread 执行同步代码
async def process():
    result = await asyncio.to_thread(sync_heavy_function)

14.10 注意事项

🔴 注意

  • 不要在异步函数中使用阻塞调用(time.sleeprequests.get
  • await 只能在 async def 函数中使用
  • asyncio.run() 每个线程只能调用一次
  • gather() 的任务是并发执行,不是并行

💡 提示

  • 使用 httpx 替代 requests 的同步阻塞调用
  • 使用 asyncio.to_thread() 包装同步代码
  • Python 3.11+ 使用 TaskGroup 替代 gather(支持异常处理)
  • 使用 asyncio.timeout() 控制超时

📌 业务场景

import asyncio
import httpx

async def crawl(urls: list[str]) -> list[dict]:
    """异步爬取多个网页。"""
    async with httpx.AsyncClient(timeout=30.0) as client:
        sem = asyncio.Semaphore(10)  # 限制并发数
        
        async def fetch(url: str) -> dict:
            async with sem:
                resp = await client.get(url)
                return {"url": url, "status": resp.status_code, "size": len(resp.text)}
        
        tasks = [fetch(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

urls = [f"https://example.com/page/{i}" for i in range(100)]
results = asyncio.run(crawl(urls))
print(f"完成 {len(results)} 个请求")

14.11 扩展阅读