强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

08 - 调度与批处理策略

08 - 调度与批处理策略

理解 vLLM 的调度机制,优化请求处理效率和资源利用率。


8.1 批处理策略演进

8.1.1 静态批处理(Static Batching)

最简单但最浪费的方式:

时间 →
请求1: ████████████████████  (长序列)
请求2: ██████████            (短序列,完成后等待)
请求3: ████████████████      (中等序列)
        ↑                   ↑
      批次开始            全部完成后才开始下一批

问题:GPU 在等待短序列完成后的空闲期完全没有工作
GPU利用率: 50-70%

8.1.2 动态批处理(Dynamic Batching)

积攒一定数量的请求后一起处理,但批次内部仍是静态的:

时间 →
请求1: ████████████████████
请求2: ██████████
请求3:     ████████████████
        ↑         ↑
      攒批完成    全部完成

优于静态批处理,但仍有空闲期
GPU利用率: 60-80%

8.1.3 连续批处理(Continuous Batching)

vLLM 的核心调度策略。在每个生成步骤后动态调整批次:

时间 →
Step:  1  2  3  4  5  6  7  8  9  10 11 12 13 14 15
请求1: ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓
请求2: ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓
请求3:              ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓
请求4:                        ▓  ▓  ▓  ▓  ▓  ▓  ▓  ▓

▓ = 该请求在该步骤被处理
请求2完成后,请求3立即加入批次,GPU 无空闲
GPU利用率: 90-98%

8.1.4 对比总结

策略GPU 利用率延迟吞吐量实现复杂度
静态批处理50-70%
动态批处理60-80%
连续批处理90-98%

8.2 vLLM 调度器详解

8.2.1 调度器核心组件

# scheduler_core.py
"""调度器核心数据结构(简化版)"""

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional

class SequenceState(Enum):
    WAITING = "waiting"
    RUNNING = "running"
    SWAPPED = "swapped"
    FINISHED = "finished"

@dataclass
class SequenceGroup:
    """一组相关的序列(例如 beam search 中的多个 beam)"""
    request_id: str
    seqs: list  # 序列列表
    state: SequenceState
    priority: float = 0.0  # 优先级
    arrival_time: float = 0.0  # 到达时间
    lora_request: Optional[object] = None
    
    @property
    def num_unfinished_seqs(self) -> int:
        return sum(1 for s in self.seqs if not s.is_finished)

class Scheduler:
    def __init__(self, config):
        # 等待队列:尚未开始处理的请求
        self.waiting: list[SequenceGroup] = []
        
        # 运行队列:正在 GPU 上生成的请求
        self.running: list[SequenceGroup] = []
        
        # 交换队列:被抢占换出到 CPU 的请求
        self.swapped: list[SequenceGroup] = []
        
        # Block 分配器
        self.block_manager = BlockManager(config.num_gpu_blocks)
        
        # 调度配置
        self.max_num_seqs = config.max_num_seqs
        self.max_num_batched_tokens = config.max_num_batched_tokens

8.2.2 调度循环

每次调度步骤的执行流程:

┌────────────────────────────────────────────────┐
│                Scheduler.schedule()             │
│                                                │
│  1. 检查资源可用性                               │
│     ├── GPU block 余量                         │
│     └── 最大并发序列数                          │
│                                                │
│  2. 调度运行中的序列(续生成)                    │
│     ├── 每个序列生成 1 个新 token               │
│     ├── 为新 token 分配 block                  │
│     └── 如果 block 不足 → 尝试抢占              │
│                                                │
│  3. 调度等待中的序列(prefill)                  │
│     ├── 检查 token 预算                        │
│     ├── 分配初始 blocks                        │
│     └── 加入运行队列                           │
│                                                │
│  4. 调度交换队列(恢复被抢占的序列)              │
│     └── 从 CPU swap 回 GPU                     │
│                                                │
│  5. 返回 SchedulerOutput                       │
└────────────────────────────────────────────────┘

8.2.3 Token 预算管理

def schedule(self) -> SchedulerOutput:
    scheduled_seq_groups = []
    remaining_token_budget = self.max_num_batched_tokens
    remaining_seq_budget = self.max_num_seqs
    
    # 1. 先调度已运行的序列(每个仅需 1 个 token)
    new_running = []
    for seq_group in self.running:
        if remaining_token_budget <= 0 or remaining_seq_budget <= 0:
            break
        if not self.block_manager.can_append(seq_group):
            # 需要抢占
            break
        
        scheduled_seq_groups.append(seq_group)
        remaining_token_budget -= 1  # decode 只需 1 个 token
        remaining_seq_budget -= 1
        new_running.append(seq_group)
    
    # 2. 调度等待中的新序列(prefill 需要处理全部 prompt tokens)
    new_waiting = []
    for seq_group in self.waiting:
        num_prompt_tokens = seq_group.get_prompt_len()
        
        if num_prompt_tokens > remaining_token_budget:
            continue  # token 预算不够
        if remaining_seq_budget <= 0:
            break  # 序列数已满
        
        # 检查是否可以分配 block
        if self.block_manager.can_allocate(num_prompt_tokens):
            self.block_manager.allocate(seq_group)
            scheduled_seq_groups.append(seq_group)
            remaining_token_budget -= num_prompt_tokens
            remaining_seq_budget -= 1
            new_running.append(seq_group)
        else:
            new_waiting.append(seq_group)  # 资源不足,继续等待
    
    self.running = new_running
    self.waiting = new_waiting
    
    return SchedulerOutput(scheduled_seq_groups)

8.3 抢占机制

8.3.1 抢占触发条件

触发抢占的条件:

1. GPU Block 耗尽
   运行中的序列需要新 block 但无空闲 block
   
2. 序列数限制
   达到 max_num_seqs 上限,但有高优先级请求等待

3. Token 预算超限
   达到 max_num_batched_tokens 上限

8.3.2 抢占策略选择

def preempt(seq_group, blocks_to_free):
    """选择抢占策略"""
    
    if can_swap_to_cpu():
        # 策略一:Swap(推荐)
        # 将 KV Cache 复制到 CPU 内存
        # 恢复时 swap 回 GPU
        cpu_blocks = swap_to_cpu(seq_group, gpu_blocks)
        return PreemptionAction.SWAP, cpu_blocks
    else:
        # 策略二:Recompute
        # 直接丢弃 KV Cache
        # 恢复时重新执行 prefill
        release_blocks(seq_group)
        return PreemptionAction.RECOMPUTE, None
抢占策略恢复开销CPU 内存需求适用场景
Swap读回 GPU 的时间需要 CPU 内存CPU 内存充足
Recompute重新 prefill 的时间不需要CPU 内存不足或短序列

8.3.3 抢占目标选择

def select_preemption_victims(num_blocks_needed: int) -> list[SequenceGroup]:
    """选择被抢占的序列(LRU 策略)"""
    
    # 按到达时间倒序(最新加入的先被抢占)
    candidates = sorted(
        self.running,
        key=lambda sg: sg.arrival_time,
        reverse=True,  # 最新到达的优先被抢占
    )
    
    victims = []
    freed_blocks = 0
    
    for seq_group in candidates:
        if freed_blocks >= num_blocks_needed:
            break
        victims.append(seq_group)
        freed_blocks += self.block_manager.get_num_blocks(seq_group)
    
    return victims

8.4 优先级调度

8.4.1 基于优先级的调度

# priority_scheduler.py
"""带优先级的请求调度"""

from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 0    # 最高优先级
    HIGH = 1
    NORMAL = 2
    LOW = 3
    BATCH = 4       # 最低优先级(批量任务)

class PriorityScheduler(Scheduler):
    def __init__(self, config):
        super().__init__(config)
        self.priority_queue: dict[Priority, list[SequenceGroup]] = {
            p: [] for p in Priority
        }
    
    def add_request(self, seq_group: SequenceGroup, priority: Priority):
        """添加带优先级的请求"""
        self.priority_queue[priority].append(seq_group)
    
    def schedule(self) -> SchedulerOutput:
        """按优先级顺序调度"""
        # 先运行已运行的序列
        scheduled = self._schedule_running()
        
        # 按优先级从高到低调度新请求
        for priority in Priority:
            if not self.priority_queue[priority]:
                continue
            
            while self.priority_queue[priority]:
                seq_group = self.priority_queue[priority][0]
                
                if not self._can_schedule(seq_group):
                    break
                
                self._schedule_prefill(seq_group)
                self.priority_queue[priority].pop(0)
                scheduled.append(seq_group)
        
        return SchedulerOutput(scheduled)

8.4.2 在 API 中使用优先级

# 通过 API 传递优先级
response = client.chat.completions.create(
    model="qwen-7b",
    messages=[{"role": "user", "content": "紧急问题"}],
    extra_body={"priority": "high"},
)

8.5 Prefill 与 Decode 分离

8.5.1 Chunked Prefill

当 prompt 很长时,一次性 prefill 会占用大量 token 预算。Chunked Prefill 将 prefill 分成多个小块:

传统 Prefill(一次性处理全部 prompt):
请求1 prompt (2048 tokens): ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓
其他运行中的请求:            ░░░░░░░░░░░░░░░░░░░░(被阻塞)

Chunked Prefill(分块处理):
Step 1: 请求1 chunk1 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓
Step 2: 请求1 chunk2 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓
Step 3: 请求1 chunk3 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓
Step 4: 请求1 chunk4 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓

优势:运行中的请求不会因长 prompt 而被延迟
# 启用 Chunked Prefill
vllm serve model \
    --enable-chunked-prefill \
    --max-num-batched-tokens 512  # chunk 大小

8.5.2 Prefill-Decode 混合调度

def schedule_with_chunked_prefill(self) -> SchedulerOutput:
    """带 Chunked Prefill 的调度"""
    
    scheduled = []
    token_budget = self.max_num_batched_tokens
    
    # 预留 decode 需要的 token
    decode_budget = len(self.running)  # 每个运行序列 1 个 token
    prefill_budget = token_budget - decode_budget
    
    # 1. 处理 decode
    for seq_group in self.running:
        scheduled.append(ScheduledSeqGroup(seq_group, is_prefill=False))
    
    # 2. 处理 prefill(分块)
    for seq_group in self.waiting:
        remaining = seq_group.get_num_unprefilled_tokens()
        chunk_size = min(remaining, prefill_budget)
        
        if chunk_size <= 0:
            break
        
        scheduled.append(ScheduledSeqGroup(
            seq_group,
            is_prefill=True,
            chunk_size=chunk_size,
        ))
        prefill_budget -= chunk_size
        
        if seq_group.is_fully_prefilled():
            self.running.append(seq_group)
    
    return SchedulerOutput(scheduled)

8.6 关键配置参数

8.6.1 调度参数一览

参数默认值说明影响
max_num_seqs256单批次最大序列数增大 → 更高并发,更多显存
max_num_batched_tokens模型最大长度单批次最大 token 数增大 → 更高吞吐,更多显存
max_model_len模型默认最大序列长度减小 → 省显存,增加并发
gpu_memory_utilization0.9GPU 显存使用率调大 → 更多 KV Cache 空间
swap_space4 (GB)CPU swap 空间增大 → 更多可 swap 的序列
enable_chunked_prefillFalse分块 prefill减少长 prompt 对延迟的影响

8.6.2 参数调优建议

# 高吞吐场景(离线批量处理)
llm = LLM(
    model="model",
    max_num_seqs=512,           # 大并发
    max_num_batched_tokens=16384,  # 大 token 预算
    max_model_len=2048,          # 限制序列长度
    gpu_memory_utilization=0.95,
)

# 低延迟场景(实时 API 服务)
llm = LLM(
    model="model",
    max_num_seqs=64,             # 适中并发
    max_num_batched_tokens=4096,
    enable_chunked_prefill=True,
    gpu_memory_utilization=0.9,
)

# 长上下文场景
llm = LLM(
    model="model",
    max_num_seqs=32,             # 较少并发
    max_num_batched_tokens=8192,
    max_model_len=32768,          # 长上下文
    gpu_memory_utilization=0.92,
)

8.7 请求排队与超时

8.7.1 队列管理

# queue_management.py
"""请求队列管理"""

import asyncio
import time

class RequestQueue:
    """请求队列管理器"""
    
    def __init__(self, max_queue_size: int = 1000, timeout: float = 300):
        self.queue = asyncio.Queue(maxsize=max_queue_size)
        self.timeout = timeout
    
    async def enqueue(self, request) -> bool:
        """添加请求到队列"""
        try:
            self.queue.put_nowait(request)
            return True
        except asyncio.QueueFull:
            return False
    
    async def dequeue(self):
        """获取下一个请求(带超时)"""
        try:
            return await asyncio.wait_for(
                self.queue.get(), 
                timeout=self.timeout
            )
        except asyncio.TimeoutError:
            return None

8.8 性能监控指标

8.8.1 关键调度指标

# 调度统计信息
scheduler_stats = {
    "num_running_seqs": 42,       # 正在运行的序列数
    "num_waiting_seqs": 156,      # 等待中的序列数
    "num_swapped_seqs": 3,        # 被 swap 的序列数
    "gpu_cache_usage": 0.85,      # GPU 缓存使用率
    "cpu_cache_usage": 0.12,      # CPU 缓存使用率
    "num_preemptions": 2,         # 本批次抢占次数
}

8.9 业务场景

场景一:混合负载

在线请求(低延迟)+ 离线任务(高吞吐)

策略:
1. 在线请求设置高优先级
2. 离线任务设置低优先级
3. 在线请求少时,离线任务填满 GPU
4. 在线请求多时,离线任务被抢占/延迟

场景二:峰值处理

                请求数
  峰值 ──┐      ┌──
         │      │
  基线 ──│──────│───────
         └──────┘
         时间

策略:
1. 队列缓冲峰值请求
2. 连续批处理高效利用 GPU
3. 必要时抢占低优先级请求
4. 队列满时返回 503

8.10 注意事项

Token 预算max_num_batched_tokens 设置过大会导致单个步骤的计算时间过长,增加延迟。建议设为 max_model_len 的 1-4 倍。

Swap 开销:频繁的 swap 操作会增加延迟。如果 swap 过于频繁,说明显存不足,应考虑减小 max_num_seqs 或使用量化。

Chunked Prefill:对于 prompt 长度差异大的场景,启用 Chunked Prefill 可以显著减少长 prompt 对其他请求的影响。

序列数限制max_num_seqs 不是越大越好。过大的值会导致每个步骤的计算时间过长。需要在吞吐量和延迟之间平衡。


8.11 扩展阅读


上一章07 - LoRA 动态适配 | 下一章09 - 分布式推理