强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第14章:异步模式 —— 经典并发模式集锦

第14章:异步模式 —— 经典并发模式集锦

14.1 模式概览

模式解决的问题复杂度
生产者-消费者解耦生产和消费速率★★☆
扇出/扇入并行处理与结果聚合★★☆
超时控制防止无限等待★☆☆
重试机制处理瞬时故障★★☆
断路器防止级联故障★★★
限流保护系统过载★★☆
缓存减少重复调用★★☆

14.2 生产者-消费者(Producer-Consumer)

核心思想

  生产者 1 ──┐
  生产者 2 ──┤    ┌─────────┐    ┌── 消费者 1
  生产者 3 ──┼───►│  缓冲队列 │────┼── 消费者 2
  生产者 4 ──┤    └─────────┘    └── 消费者 3
  生产者 N ──┘

Go 实现

func producerConsumer() {
    ch := make(chan int, 100) // 带缓冲的通道

    // 启动 3 个生产者
    for i := 0; i < 3; i++ {
        go func(id int) {
            for j := 0; j < 10; j++ {
                ch <- id*100 + j
                time.Sleep(time.Millisecond * 100)
            }
        }(i)
    }

    // 启动 5 个消费者
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for v := range ch {
                fmt.Printf("消费者 %d 处理: %d\n", id, v)
            }
        }(i)
    }

    // 等待生产者完成后关闭通道
    time.Sleep(3 * time.Second)
    close(ch)
    wg.Wait()
}

Python 实现

import asyncio

async def producer_consumer():
    queue = asyncio.Queue(maxsize=100)
    
    async def producer(id: int):
        for i in range(10):
            await queue.put(id * 100 + i)
            await asyncio.sleep(0.1)
    
    async def consumer(id: int):
        while True:
            item = await queue.get()
            if item is None:
                queue.task_done()
                break
            print(f"消费者 {id} 处理: {item}")
            queue.task_done()
    
    producers = [asyncio.create_task(producer(i)) for i in range(3)]
    consumers = [asyncio.create_task(consumer(i)) for i in range(5)]
    
    await asyncio.gather(*producers)
    
    for _ in consumers:
        await queue.put(None)
    await asyncio.gather(*consumers)

JavaScript 实现

class AsyncQueue {
    constructor(maxSize = Infinity) {
        this.queue = [];
        this.maxSize = maxSize;
        this.producers = [];
        this.consumers = [];
    }

    async put(item) {
        if (this.queue.length >= this.maxSize) {
            await new Promise(resolve => this.producers.push(resolve));
        }
        this.queue.push(item);
        if (this.consumers.length > 0) {
            this.consumers.shift()(this.queue.shift());
        }
    }

    async get() {
        if (this.queue.length === 0) {
            await new Promise(resolve => this.consumers.push(resolve));
        }
        if (this.producers.length > 0) {
            this.producers.shift()();
        }
        return this.queue.shift();
    }
}

14.3 扇出/扇入(Fan-Out / Fan-In)

模式描述

  输入流 ──────────────────────────────────► 输出流
              │          │          │
              ▼          ▼          ▼
          Worker 1   Worker 2   Worker 3
              │          │          │
              └──────────┼──────────┘
                         ▼
                      聚合器

Go 实现

func fanOutFanIn(input <-chan int, numWorkers int) <-chan int {
    // Fan-out: 将工作分配给多个 goroutine
    workers := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workers[i] = process(input)
    }

    // Fan-in: 合并所有 worker 的结果
    return merge(workers...)
}

func process(input <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range input {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            out <- v * v // 处理
        }
    }()
    return out
}

func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

14.4 超时控制(Timeout)

Go 实现

func fetchWithTimeout(ctx context.Context, url string, timeout time.Duration) ([]byte, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return io.ReadAll(resp.Body)
}

Python 实现

import asyncio

async def fetch_with_timeout(coro, timeout_seconds: float):
    try:
        return await asyncio.wait_for(coro, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        raise TimeoutError(f"操作超时 ({timeout_seconds}s)")

# 使用
async def main():
    try:
        result = await fetch_with_timeout(
            aiohttp_session.get("https://api.example.com"),
            timeout_seconds=5.0
        )
    except TimeoutError:
        print("请求超时,使用缓存数据")
        result = get_cached_data()

JavaScript 实现

function withTimeout(promise, ms, message = '操作超时') {
    return Promise.race([
        promise,
        new Promise((_, reject) => 
            setTimeout(() => reject(new TimeoutError(message)), ms)
        ),
    ]);
}

// 使用 AbortController(更优雅)
async function fetchWithTimeout(url, ms) {
    const controller = new AbortController();
    const timeout = setTimeout(() => controller.abort(), ms);
    
    try {
        const response = await fetch(url, { signal: controller.signal });
        return await response.json();
    } finally {
        clearTimeout(timeout);
    }
}

14.5 重试机制(Retry)

指数退避(Exponential Backoff)

import asyncio
import random

async def retry_with_backoff(
    func,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    jitter: bool = True,
):
    for attempt in range(max_retries + 1):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries:
                raise
            
            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay *= (0.5 + random.random())
            
            print(f"第 {attempt + 1} 次失败,{delay:.1f}s 后重试: {e}")
            await asyncio.sleep(delay)

Go 实现

func retryWithBackoff(ctx context.Context, fn func() error, maxRetries int) error {
    backoff := time.Second
    for i := 0; i <= maxRetries; i++ {
        err := fn()
        if err == nil {
            return nil
        }
        if i == maxRetries {
            return err
        }
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(backoff):
            backoff *= 2
            if backoff > 30*time.Second {
                backoff = 30 * time.Second
            }
        }
    }
    return nil
}

重试策略对比

策略间隔适用场景
固定间隔恒定简单场景
线性退避每次增加轻度拥塞
指数退避按 2^n 增长网络请求
指数退避 + 抖动按 2^n + 随机大规模系统(避免雷群效应)

14.6 断路器(Circuit Breaker)

三种状态

          成功计数达到阈值
  ┌─────────────────────────────┐
  │                             │
  ▼                             │
┌──────┐  失败超过阈值   ┌──────┐  超时    ┌──────┐
│ CLOSED │ ────────────► │ OPEN │ ──────► │HALF  │
│ (正常) │               │(断开)│         │ OPEN │
└────────┘               └──────┘         └──┬───┘
    ▲                                        │
    │              失败                       │ 成功
    │         ┌────────────────┐             │
    │         │                │             │
    └─────────┘    ◄───────────┘─────────────┘
       重新回到 CLOSED         回到 OPEN

Python 实现

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=30):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None

    async def call(self, func):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("断路器已断开")

        try:
            result = await func()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

Go 实现(使用 Sony 的 gobreaker)

import "github.com/sony/gobreaker"

cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
    Name:        "my-service",
    MaxRequests: 3,                // Half-Open 时允许的最大请求数
    Interval:    10 * time.Second, // Closed 状态下的统计周期
    Timeout:     30 * time.Second, // Open 状态的超时时间
    ReadyToTrip: func(counts gobreaker.Counts) bool {
        return counts.ConsecutiveFailures > 5
    },
})

result, err := cb.Execute(func() (interface{}, error) {
    return http.Get("https://api.example.com")
})

14.7 限流(Rate Limiting)

令牌桶(Token Bucket)

func tokenBucket(rate int, burst int) <-chan struct{} {
    ch := make(chan struct{}, burst)
    
    go func() {
        ticker := time.NewTicker(time.Second / time.Duration(rate))
        defer ticker.Stop()
        
        for range ticker.C {
            select {
            case ch <- struct{}{}:
            default: // 桶满则丢弃
            }
        }
    }()
    
    return ch
}

Python 限流装饰器

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, per: float = 1.0):
        self.rate = rate
        self.per = per
        self.allowance = rate
        self.last_check = time.monotonic()

    async def acquire(self):
        current = time.monotonic()
        elapsed = current - self.last_check
        self.last_check = current
        self.allowance += elapsed * (self.rate / self.per)
        if self.allowance > self.rate:
            self.allowance = self.rate
        
        if self.allowance < 1.0:
            sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
            await asyncio.sleep(sleep_time)
            self.allowance = 0.0
        else:
            self.allowance -= 1.0

# 使用
limiter = RateLimiter(rate=10, per=1.0)  # 每秒 10 个

async def call_api(url):
    await limiter.acquire()
    return await fetch(url)

14.8 组合模式

在实际业务中,通常需要组合使用多种模式:

async def resilient_api_call(url: str, data: dict) -> dict:
    """组合:超时 + 重试 + 断路器 + 限流"""
    
    breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
    limiter = RateLimiter(rate=100, per=1.0)
    
    async def _call():
        await limiter.acquire()
        async with httpx.AsyncClient() as client:
            response = await client.post(
                url, json=data, timeout=10.0
            )
            response.raise_for_status()
            return response.json()
    
    # 断路器包装重试包装超时
    return await retry_with_backoff(
        lambda: breaker.call(_call),
        max_retries=3,
        base_delay=1.0,
    )

14.9 本章小结

模式核心思想适用场景
生产者-消费者解耦生产/消费消息队列、日志处理
扇出/扇入并行处理 + 聚合批量数据处理
超时限时等待外部服务调用
重试自动恢复瞬时故障
断路器快速失败依赖服务不可用
限流保护系统API 网关、写入限流

下一章预告:当消费者的处理速度跟不上生产者时,系统会怎样?下一章我们将学习背压(Backpressure)机制。


扩展阅读