第14章:异步模式 —— 经典并发模式集锦
第14章:异步模式 —— 经典并发模式集锦
14.1 模式概览
| 模式 | 解决的问题 | 复杂度 |
|---|
| 生产者-消费者 | 解耦生产和消费速率 | ★★☆ |
| 扇出/扇入 | 并行处理与结果聚合 | ★★☆ |
| 超时控制 | 防止无限等待 | ★☆☆ |
| 重试机制 | 处理瞬时故障 | ★★☆ |
| 断路器 | 防止级联故障 | ★★★ |
| 限流 | 保护系统过载 | ★★☆ |
| 缓存 | 减少重复调用 | ★★☆ |
14.2 生产者-消费者(Producer-Consumer)
核心思想
生产者 1 ──┐
生产者 2 ──┤ ┌─────────┐ ┌── 消费者 1
生产者 3 ──┼───►│ 缓冲队列 │────┼── 消费者 2
生产者 4 ──┤ └─────────┘ └── 消费者 3
生产者 N ──┘
Go 实现
func producerConsumer() {
ch := make(chan int, 100) // 带缓冲的通道
// 启动 3 个生产者
for i := 0; i < 3; i++ {
go func(id int) {
for j := 0; j < 10; j++ {
ch <- id*100 + j
time.Sleep(time.Millisecond * 100)
}
}(i)
}
// 启动 5 个消费者
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for v := range ch {
fmt.Printf("消费者 %d 处理: %d\n", id, v)
}
}(i)
}
// 等待生产者完成后关闭通道
time.Sleep(3 * time.Second)
close(ch)
wg.Wait()
}
Python 实现
import asyncio
async def producer_consumer():
queue = asyncio.Queue(maxsize=100)
async def producer(id: int):
for i in range(10):
await queue.put(id * 100 + i)
await asyncio.sleep(0.1)
async def consumer(id: int):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"消费者 {id} 处理: {item}")
queue.task_done()
producers = [asyncio.create_task(producer(i)) for i in range(3)]
consumers = [asyncio.create_task(consumer(i)) for i in range(5)]
await asyncio.gather(*producers)
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
JavaScript 实现
class AsyncQueue {
constructor(maxSize = Infinity) {
this.queue = [];
this.maxSize = maxSize;
this.producers = [];
this.consumers = [];
}
async put(item) {
if (this.queue.length >= this.maxSize) {
await new Promise(resolve => this.producers.push(resolve));
}
this.queue.push(item);
if (this.consumers.length > 0) {
this.consumers.shift()(this.queue.shift());
}
}
async get() {
if (this.queue.length === 0) {
await new Promise(resolve => this.consumers.push(resolve));
}
if (this.producers.length > 0) {
this.producers.shift()();
}
return this.queue.shift();
}
}
14.3 扇出/扇入(Fan-Out / Fan-In)
模式描述
输入流 ──────────────────────────────────► 输出流
│ │ │
▼ ▼ ▼
Worker 1 Worker 2 Worker 3
│ │ │
└──────────┼──────────┘
▼
聚合器
Go 实现
func fanOutFanIn(input <-chan int, numWorkers int) <-chan int {
// Fan-out: 将工作分配给多个 goroutine
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = process(input)
}
// Fan-in: 合并所有 worker 的结果
return merge(workers...)
}
func process(input <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range input {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
out <- v * v // 处理
}
}()
return out
}
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
14.4 超时控制(Timeout)
Go 实现
func fetchWithTimeout(ctx context.Context, url string, timeout time.Duration) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
Python 实现
import asyncio
async def fetch_with_timeout(coro, timeout_seconds: float):
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"操作超时 ({timeout_seconds}s)")
# 使用
async def main():
try:
result = await fetch_with_timeout(
aiohttp_session.get("https://api.example.com"),
timeout_seconds=5.0
)
except TimeoutError:
print("请求超时,使用缓存数据")
result = get_cached_data()
JavaScript 实现
function withTimeout(promise, ms, message = '操作超时') {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new TimeoutError(message)), ms)
),
]);
}
// 使用 AbortController(更优雅)
async function fetchWithTimeout(url, ms) {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), ms);
try {
const response = await fetch(url, { signal: controller.signal });
return await response.json();
} finally {
clearTimeout(timeout);
}
}
14.5 重试机制(Retry)
指数退避(Exponential Backoff)
import asyncio
import random
async def retry_with_backoff(
func,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
jitter: bool = True,
):
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random())
print(f"第 {attempt + 1} 次失败,{delay:.1f}s 后重试: {e}")
await asyncio.sleep(delay)
Go 实现
func retryWithBackoff(ctx context.Context, fn func() error, maxRetries int) error {
backoff := time.Second
for i := 0; i <= maxRetries; i++ {
err := fn()
if err == nil {
return nil
}
if i == maxRetries {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
backoff *= 2
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
}
}
return nil
}
重试策略对比
| 策略 | 间隔 | 适用场景 |
|---|
| 固定间隔 | 恒定 | 简单场景 |
| 线性退避 | 每次增加 | 轻度拥塞 |
| 指数退避 | 按 2^n 增长 | 网络请求 |
| 指数退避 + 抖动 | 按 2^n + 随机 | 大规模系统(避免雷群效应) |
14.6 断路器(Circuit Breaker)
三种状态
成功计数达到阈值
┌─────────────────────────────┐
│ │
▼ │
┌──────┐ 失败超过阈值 ┌──────┐ 超时 ┌──────┐
│ CLOSED │ ────────────► │ OPEN │ ──────► │HALF │
│ (正常) │ │(断开)│ │ OPEN │
└────────┘ └──────┘ └──┬───┘
▲ │
│ 失败 │ 成功
│ ┌────────────────┐ │
│ │ │ │
└─────────┘ ◄───────────┘─────────────┘
重新回到 CLOSED 回到 OPEN
Python 实现
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
async def call(self, func):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("断路器已断开")
try:
result = await func()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Go 实现(使用 Sony 的 gobreaker)
import "github.com/sony/gobreaker"
cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "my-service",
MaxRequests: 3, // Half-Open 时允许的最大请求数
Interval: 10 * time.Second, // Closed 状态下的统计周期
Timeout: 30 * time.Second, // Open 状态的超时时间
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
result, err := cb.Execute(func() (interface{}, error) {
return http.Get("https://api.example.com")
})
14.7 限流(Rate Limiting)
令牌桶(Token Bucket)
func tokenBucket(rate int, burst int) <-chan struct{} {
ch := make(chan struct{}, burst)
go func() {
ticker := time.NewTicker(time.Second / time.Duration(rate))
defer ticker.Stop()
for range ticker.C {
select {
case ch <- struct{}{}:
default: // 桶满则丢弃
}
}
}()
return ch
}
Python 限流装饰器
import asyncio
import time
class RateLimiter:
def __init__(self, rate: float, per: float = 1.0):
self.rate = rate
self.per = per
self.allowance = rate
self.last_check = time.monotonic()
async def acquire(self):
current = time.monotonic()
elapsed = current - self.last_check
self.last_check = current
self.allowance += elapsed * (self.rate / self.per)
if self.allowance > self.rate:
self.allowance = self.rate
if self.allowance < 1.0:
sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
await asyncio.sleep(sleep_time)
self.allowance = 0.0
else:
self.allowance -= 1.0
# 使用
limiter = RateLimiter(rate=10, per=1.0) # 每秒 10 个
async def call_api(url):
await limiter.acquire()
return await fetch(url)
14.8 组合模式
在实际业务中,通常需要组合使用多种模式:
async def resilient_api_call(url: str, data: dict) -> dict:
"""组合:超时 + 重试 + 断路器 + 限流"""
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
limiter = RateLimiter(rate=100, per=1.0)
async def _call():
await limiter.acquire()
async with httpx.AsyncClient() as client:
response = await client.post(
url, json=data, timeout=10.0
)
response.raise_for_status()
return response.json()
# 断路器包装重试包装超时
return await retry_with_backoff(
lambda: breaker.call(_call),
max_retries=3,
base_delay=1.0,
)
14.9 本章小结
| 模式 | 核心思想 | 适用场景 |
|---|
| 生产者-消费者 | 解耦生产/消费 | 消息队列、日志处理 |
| 扇出/扇入 | 并行处理 + 聚合 | 批量数据处理 |
| 超时 | 限时等待 | 外部服务调用 |
| 重试 | 自动恢复 | 瞬时故障 |
| 断路器 | 快速失败 | 依赖服务不可用 |
| 限流 | 保护系统 | API 网关、写入限流 |
下一章预告:当消费者的处理速度跟不上生产者时,系统会怎样?下一章我们将学习背压(Backpressure)机制。
扩展阅读