08 - 调度与批处理策略
08 - 调度与批处理策略
理解 vLLM 的调度机制,优化请求处理效率和资源利用率。
8.1 批处理策略演进
8.1.1 静态批处理(Static Batching)
最简单但最浪费的方式:
时间 →
请求1: ████████████████████ (长序列)
请求2: ██████████ (短序列,完成后等待)
请求3: ████████████████ (中等序列)
↑ ↑
批次开始 全部完成后才开始下一批
问题:GPU 在等待短序列完成后的空闲期完全没有工作
GPU利用率: 50-70%
8.1.2 动态批处理(Dynamic Batching)
积攒一定数量的请求后一起处理,但批次内部仍是静态的:
时间 →
请求1: ████████████████████
请求2: ██████████
请求3: ████████████████
↑ ↑
攒批完成 全部完成
优于静态批处理,但仍有空闲期
GPU利用率: 60-80%
8.1.3 连续批处理(Continuous Batching)
vLLM 的核心调度策略。在每个生成步骤后动态调整批次:
时间 →
Step: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
请求1: ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓
请求2: ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓
请求3: ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓
请求4: ▓ ▓ ▓ ▓ ▓ ▓ ▓ ▓
▓ = 该请求在该步骤被处理
请求2完成后,请求3立即加入批次,GPU 无空闲
GPU利用率: 90-98%
8.1.4 对比总结
| 策略 | GPU 利用率 | 延迟 | 吞吐量 | 实现复杂度 |
|---|---|---|---|---|
| 静态批处理 | 50-70% | 高 | 低 | 低 |
| 动态批处理 | 60-80% | 中 | 中 | 中 |
| 连续批处理 | 90-98% | 低 | 高 | 高 |
8.2 vLLM 调度器详解
8.2.1 调度器核心组件
# scheduler_core.py
"""调度器核心数据结构(简化版)"""
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class SequenceState(Enum):
WAITING = "waiting"
RUNNING = "running"
SWAPPED = "swapped"
FINISHED = "finished"
@dataclass
class SequenceGroup:
"""一组相关的序列(例如 beam search 中的多个 beam)"""
request_id: str
seqs: list # 序列列表
state: SequenceState
priority: float = 0.0 # 优先级
arrival_time: float = 0.0 # 到达时间
lora_request: Optional[object] = None
@property
def num_unfinished_seqs(self) -> int:
return sum(1 for s in self.seqs if not s.is_finished)
class Scheduler:
def __init__(self, config):
# 等待队列:尚未开始处理的请求
self.waiting: list[SequenceGroup] = []
# 运行队列:正在 GPU 上生成的请求
self.running: list[SequenceGroup] = []
# 交换队列:被抢占换出到 CPU 的请求
self.swapped: list[SequenceGroup] = []
# Block 分配器
self.block_manager = BlockManager(config.num_gpu_blocks)
# 调度配置
self.max_num_seqs = config.max_num_seqs
self.max_num_batched_tokens = config.max_num_batched_tokens
8.2.2 调度循环
每次调度步骤的执行流程:
┌────────────────────────────────────────────────┐
│ Scheduler.schedule() │
│ │
│ 1. 检查资源可用性 │
│ ├── GPU block 余量 │
│ └── 最大并发序列数 │
│ │
│ 2. 调度运行中的序列(续生成) │
│ ├── 每个序列生成 1 个新 token │
│ ├── 为新 token 分配 block │
│ └── 如果 block 不足 → 尝试抢占 │
│ │
│ 3. 调度等待中的序列(prefill) │
│ ├── 检查 token 预算 │
│ ├── 分配初始 blocks │
│ └── 加入运行队列 │
│ │
│ 4. 调度交换队列(恢复被抢占的序列) │
│ └── 从 CPU swap 回 GPU │
│ │
│ 5. 返回 SchedulerOutput │
└────────────────────────────────────────────────┘
8.2.3 Token 预算管理
def schedule(self) -> SchedulerOutput:
scheduled_seq_groups = []
remaining_token_budget = self.max_num_batched_tokens
remaining_seq_budget = self.max_num_seqs
# 1. 先调度已运行的序列(每个仅需 1 个 token)
new_running = []
for seq_group in self.running:
if remaining_token_budget <= 0 or remaining_seq_budget <= 0:
break
if not self.block_manager.can_append(seq_group):
# 需要抢占
break
scheduled_seq_groups.append(seq_group)
remaining_token_budget -= 1 # decode 只需 1 个 token
remaining_seq_budget -= 1
new_running.append(seq_group)
# 2. 调度等待中的新序列(prefill 需要处理全部 prompt tokens)
new_waiting = []
for seq_group in self.waiting:
num_prompt_tokens = seq_group.get_prompt_len()
if num_prompt_tokens > remaining_token_budget:
continue # token 预算不够
if remaining_seq_budget <= 0:
break # 序列数已满
# 检查是否可以分配 block
if self.block_manager.can_allocate(num_prompt_tokens):
self.block_manager.allocate(seq_group)
scheduled_seq_groups.append(seq_group)
remaining_token_budget -= num_prompt_tokens
remaining_seq_budget -= 1
new_running.append(seq_group)
else:
new_waiting.append(seq_group) # 资源不足,继续等待
self.running = new_running
self.waiting = new_waiting
return SchedulerOutput(scheduled_seq_groups)
8.3 抢占机制
8.3.1 抢占触发条件
触发抢占的条件:
1. GPU Block 耗尽
运行中的序列需要新 block 但无空闲 block
2. 序列数限制
达到 max_num_seqs 上限,但有高优先级请求等待
3. Token 预算超限
达到 max_num_batched_tokens 上限
8.3.2 抢占策略选择
def preempt(seq_group, blocks_to_free):
"""选择抢占策略"""
if can_swap_to_cpu():
# 策略一:Swap(推荐)
# 将 KV Cache 复制到 CPU 内存
# 恢复时 swap 回 GPU
cpu_blocks = swap_to_cpu(seq_group, gpu_blocks)
return PreemptionAction.SWAP, cpu_blocks
else:
# 策略二:Recompute
# 直接丢弃 KV Cache
# 恢复时重新执行 prefill
release_blocks(seq_group)
return PreemptionAction.RECOMPUTE, None
| 抢占策略 | 恢复开销 | CPU 内存需求 | 适用场景 |
|---|---|---|---|
| Swap | 读回 GPU 的时间 | 需要 CPU 内存 | CPU 内存充足 |
| Recompute | 重新 prefill 的时间 | 不需要 | CPU 内存不足或短序列 |
8.3.3 抢占目标选择
def select_preemption_victims(num_blocks_needed: int) -> list[SequenceGroup]:
"""选择被抢占的序列(LRU 策略)"""
# 按到达时间倒序(最新加入的先被抢占)
candidates = sorted(
self.running,
key=lambda sg: sg.arrival_time,
reverse=True, # 最新到达的优先被抢占
)
victims = []
freed_blocks = 0
for seq_group in candidates:
if freed_blocks >= num_blocks_needed:
break
victims.append(seq_group)
freed_blocks += self.block_manager.get_num_blocks(seq_group)
return victims
8.4 优先级调度
8.4.1 基于优先级的调度
# priority_scheduler.py
"""带优先级的请求调度"""
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # 最高优先级
HIGH = 1
NORMAL = 2
LOW = 3
BATCH = 4 # 最低优先级(批量任务)
class PriorityScheduler(Scheduler):
def __init__(self, config):
super().__init__(config)
self.priority_queue: dict[Priority, list[SequenceGroup]] = {
p: [] for p in Priority
}
def add_request(self, seq_group: SequenceGroup, priority: Priority):
"""添加带优先级的请求"""
self.priority_queue[priority].append(seq_group)
def schedule(self) -> SchedulerOutput:
"""按优先级顺序调度"""
# 先运行已运行的序列
scheduled = self._schedule_running()
# 按优先级从高到低调度新请求
for priority in Priority:
if not self.priority_queue[priority]:
continue
while self.priority_queue[priority]:
seq_group = self.priority_queue[priority][0]
if not self._can_schedule(seq_group):
break
self._schedule_prefill(seq_group)
self.priority_queue[priority].pop(0)
scheduled.append(seq_group)
return SchedulerOutput(scheduled)
8.4.2 在 API 中使用优先级
# 通过 API 传递优先级
response = client.chat.completions.create(
model="qwen-7b",
messages=[{"role": "user", "content": "紧急问题"}],
extra_body={"priority": "high"},
)
8.5 Prefill 与 Decode 分离
8.5.1 Chunked Prefill
当 prompt 很长时,一次性 prefill 会占用大量 token 预算。Chunked Prefill 将 prefill 分成多个小块:
传统 Prefill(一次性处理全部 prompt):
请求1 prompt (2048 tokens): ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓
其他运行中的请求: ░░░░░░░░░░░░░░░░░░░░(被阻塞)
Chunked Prefill(分块处理):
Step 1: 请求1 chunk1 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓
Step 2: 请求1 chunk2 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓
Step 3: 请求1 chunk3 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓
Step 4: 请求1 chunk4 (512): ▓▓▓▓▓ | 其他请求: ▓▓▓
优势:运行中的请求不会因长 prompt 而被延迟
# 启用 Chunked Prefill
vllm serve model \
--enable-chunked-prefill \
--max-num-batched-tokens 512 # chunk 大小
8.5.2 Prefill-Decode 混合调度
def schedule_with_chunked_prefill(self) -> SchedulerOutput:
"""带 Chunked Prefill 的调度"""
scheduled = []
token_budget = self.max_num_batched_tokens
# 预留 decode 需要的 token
decode_budget = len(self.running) # 每个运行序列 1 个 token
prefill_budget = token_budget - decode_budget
# 1. 处理 decode
for seq_group in self.running:
scheduled.append(ScheduledSeqGroup(seq_group, is_prefill=False))
# 2. 处理 prefill(分块)
for seq_group in self.waiting:
remaining = seq_group.get_num_unprefilled_tokens()
chunk_size = min(remaining, prefill_budget)
if chunk_size <= 0:
break
scheduled.append(ScheduledSeqGroup(
seq_group,
is_prefill=True,
chunk_size=chunk_size,
))
prefill_budget -= chunk_size
if seq_group.is_fully_prefilled():
self.running.append(seq_group)
return SchedulerOutput(scheduled)
8.6 关键配置参数
8.6.1 调度参数一览
| 参数 | 默认值 | 说明 | 影响 |
|---|---|---|---|
max_num_seqs | 256 | 单批次最大序列数 | 增大 → 更高并发,更多显存 |
max_num_batched_tokens | 模型最大长度 | 单批次最大 token 数 | 增大 → 更高吞吐,更多显存 |
max_model_len | 模型默认 | 最大序列长度 | 减小 → 省显存,增加并发 |
gpu_memory_utilization | 0.9 | GPU 显存使用率 | 调大 → 更多 KV Cache 空间 |
swap_space | 4 (GB) | CPU swap 空间 | 增大 → 更多可 swap 的序列 |
enable_chunked_prefill | False | 分块 prefill | 减少长 prompt 对延迟的影响 |
8.6.2 参数调优建议
# 高吞吐场景(离线批量处理)
llm = LLM(
model="model",
max_num_seqs=512, # 大并发
max_num_batched_tokens=16384, # 大 token 预算
max_model_len=2048, # 限制序列长度
gpu_memory_utilization=0.95,
)
# 低延迟场景(实时 API 服务)
llm = LLM(
model="model",
max_num_seqs=64, # 适中并发
max_num_batched_tokens=4096,
enable_chunked_prefill=True,
gpu_memory_utilization=0.9,
)
# 长上下文场景
llm = LLM(
model="model",
max_num_seqs=32, # 较少并发
max_num_batched_tokens=8192,
max_model_len=32768, # 长上下文
gpu_memory_utilization=0.92,
)
8.7 请求排队与超时
8.7.1 队列管理
# queue_management.py
"""请求队列管理"""
import asyncio
import time
class RequestQueue:
"""请求队列管理器"""
def __init__(self, max_queue_size: int = 1000, timeout: float = 300):
self.queue = asyncio.Queue(maxsize=max_queue_size)
self.timeout = timeout
async def enqueue(self, request) -> bool:
"""添加请求到队列"""
try:
self.queue.put_nowait(request)
return True
except asyncio.QueueFull:
return False
async def dequeue(self):
"""获取下一个请求(带超时)"""
try:
return await asyncio.wait_for(
self.queue.get(),
timeout=self.timeout
)
except asyncio.TimeoutError:
return None
8.8 性能监控指标
8.8.1 关键调度指标
# 调度统计信息
scheduler_stats = {
"num_running_seqs": 42, # 正在运行的序列数
"num_waiting_seqs": 156, # 等待中的序列数
"num_swapped_seqs": 3, # 被 swap 的序列数
"gpu_cache_usage": 0.85, # GPU 缓存使用率
"cpu_cache_usage": 0.12, # CPU 缓存使用率
"num_preemptions": 2, # 本批次抢占次数
}
8.9 业务场景
场景一:混合负载
在线请求(低延迟)+ 离线任务(高吞吐)
策略:
1. 在线请求设置高优先级
2. 离线任务设置低优先级
3. 在线请求少时,离线任务填满 GPU
4. 在线请求多时,离线任务被抢占/延迟
场景二:峰值处理
请求数
峰值 ──┐ ┌──
│ │
基线 ──│──────│───────
└──────┘
时间
策略:
1. 队列缓冲峰值请求
2. 连续批处理高效利用 GPU
3. 必要时抢占低优先级请求
4. 队列满时返回 503
8.10 注意事项
Token 预算:
max_num_batched_tokens设置过大会导致单个步骤的计算时间过长,增加延迟。建议设为max_model_len的 1-4 倍。
Swap 开销:频繁的 swap 操作会增加延迟。如果 swap 过于频繁,说明显存不足,应考虑减小
max_num_seqs或使用量化。
Chunked Prefill:对于 prompt 长度差异大的场景,启用 Chunked Prefill 可以显著减少长 prompt 对其他请求的影响。
序列数限制:
max_num_seqs不是越大越好。过大的值会导致每个步骤的计算时间过长。需要在吞吐量和延迟之间平衡。
8.11 扩展阅读
- Orca: A Distributed Serving System for Transformer-Based Generative Models (OSDI 2022)
- Sarathi: Efficient LLM Inference by Piggybacking Decodes with Chunked Prefills
- vLLM Scheduler 源码
- Continuous Batching 详解
上一章:07 - LoRA 动态适配 | 下一章:09 - 分布式推理