强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

04 - 流式响应处理

第 04 章 · 流式响应处理 (Streaming)

流式响应让 AI 回复像"打字机"一样逐字输出,极大提升用户体验。本章详解 SSE 协议、前后端实现和错误处理。


4.1 为什么需要流式响应

对比:普通 vs 流式

特性普通请求流式请求
用户体验等待 2-10s 后一次性显示逐字输出,即时反馈
首字延迟与总响应时间相同极低(通常 < 1s)
适用场景后台任务、批量处理对话、实时交互
实现复杂度简单中等
错误处理简单需处理中断

4.2 Python 流式调用

基础流式输出

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "user", "content": "写一首关于编程的五言绝句"}
    ],
    stream=True,  # 开启流式
)

# 逐块接收
for chunk in stream:
    content = chunk.choices[0].delta.content
    if content is not None:
        print(content, end="", flush=True)

print()  # 换行

流式 Chunk 结构

Chunk 1: {"choices": [{"delta": {"role": "assistant"}, "index": 0}]}
Chunk 2: {"choices": [{"delta": {"content": "代"}, "index": 0}]}
Chunk 3: {"choices": [{"delta": {"content": "码"}, "index": 0}]}
Chunk 4: {"choices": [{"delta": {"content": "如"}, "index": 0}]}
...
Chunk N: {"choices": [{"delta": {}, "finish_reason": "stop"}]}

注意:第一个 chunk 的 delta.content 通常为 None,只包含 role 信息。

收集完整响应

def stream_chat(messages: list, model: str = "gpt-4o-mini") -> tuple[str, dict]:
    """流式获取完整响应并返回"""
    stream = client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True,
        stream_options={"include_usage": True},  # 包含 token 统计
    )

    full_response = []
    usage_info = {}

    for chunk in stream:
        # 提取内容
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response.append(content)
            print(content, end="", flush=True)

        # 提取用量统计(最后一个 chunk)
        if chunk.usage:
            usage_info = {
                "prompt_tokens": chunk.usage.prompt_tokens,
                "completion_tokens": chunk.usage.completion_tokens,
                "total_tokens": chunk.usage.total_tokens,
            }

    print()
    return "".join(full_response), usage_info

# 使用
response, usage = stream_chat(
    [{"role": "user", "content": "解释什么是微服务架构"}]
)
print(f"Token 用量: {usage}")

4.3 流式多轮对话

from openai import OpenAI

client = OpenAI()

def stream_multi_turn():
    messages = [
        {"role": "system", "content": "你是一个友好的AI助手。"}
    ]

    while True:
        user_input = input("\n你: ")
        if user_input.lower() in ["exit", "quit"]:
            break

        messages.append({"role": "user", "content": user_input})

        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            stream=True,
        )

        print("AI: ", end="")
        full_response = []
        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                print(content, end="", flush=True)
                full_response.append(content)

        print()

        # 将 AI 回复加入历史
        messages.append({
            "role": "assistant",
            "content": "".join(full_response)
        })

stream_multi_turn()

4.4 SSE (Server-Sent Events) 协议

流式响应基于 SSE 协议,每条消息格式:

data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"你"},"index":0}]}\n\n
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"好"},"index":0}]}\n\n
data: [DONE]\n\n

关键特征

  • 每条消息以 data: 前缀
  • 消息之间以 \n\n 分隔
  • 最后一条消息为 data: [DONE]
  • Content-Type 为 text/event-stream

4.5 Node.js 流式处理

// streaming.mjs
import OpenAI from 'openai';

const client = new OpenAI();

async function streamChat() {
  const stream = await client.chat.completions.create({
    model: 'gpt-4o-mini',
    messages: [
      { role: 'user', content: '用三句话解释人工智能' }
    ],
    stream: true,
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      process.stdout.write(content);
    }
  }
  console.log();
}

streamChat();

4.6 FastAPI 后端流式接口

# server.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
from pydantic import BaseModel
import json

app = FastAPI()
client = OpenAI()

class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4o-mini"

@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
    """SSE 流式聊天接口"""

    async def generate():
        stream = client.chat.completions.create(
            model=request.model,
            messages=[{"role": "user", "content": request.message}],
            stream=True,
        )

        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                # SSE 格式
                yield f"data: {json.dumps({'content': content})}\n\n"

        # 结束标记
        yield f"data: {json.dumps({'done': True})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Nginx 禁用缓冲
        }
    )

4.7 前端流式处理

JavaScript (Fetch API)

async function streamChat(message) {
  const response = await fetch('/api/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // 解析 SSE 消息
    const lines = buffer.split('\n\n');
    buffer = lines.pop(); // 保留未完成的行

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));

        if (data.done) {
          console.log('流式输出完成');
          return;
        }

        // 追加到页面
        document.getElementById('output').textContent += data.content;
      }
    }
  }
}

React 组件示例

function StreamChat() {
  const [output, setOutput] = useState('');
  const [loading, setLoading] = useState(false);

  const handleSend = async (message) => {
    setOutput('');
    setLoading(true);

    try {
      const response = await fetch('/api/chat/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message }),
      });

      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const text = decoder.decode(value, { stream: true });
        const lines = text.split('\n\n').filter(l => l.startsWith('data: '));

        for (const line of lines) {
          const data = JSON.parse(line.slice(6));
          if (!data.done) {
            setOutput(prev => prev + data.content);
          }
        }
      }
    } finally {
      setLoading(false);
    }
  };

  return (
    <div>
      <button onClick={() => handleSend('你好')} disabled={loading}>
        发送
      </button>
      <div>{output}</div>
    </div>
  );
}

4.8 流式 Function Calling

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "北京今天天气怎么样?"}],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取天气",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {"type": "string", "description": "城市名"}
                },
                "required": ["city"]
            }
        }
    }],
    stream=True,
)

# 收集工具调用参数
tool_calls = {}
for chunk in stream:
    delta = chunk.choices[0].delta

    if delta.tool_calls:
        for tc in delta.tool_calls:
            idx = tc.index
            if idx not in tool_calls:
                tool_calls[idx] = {
                    "id": tc.id or "",
                    "function": {"name": "", "arguments": ""}
                }
            if tc.function:
                if tc.function.name:
                    tool_calls[idx]["function"]["name"] = tc.function.name
                if tc.function.arguments:
                    tool_calls[idx]["function"]["arguments"] += tc.function.arguments

import json
for idx, tc in tool_calls.items():
    args = json.loads(tc["function"]["arguments"])
    print(f"调用函数: {tc['function']['name']}, 参数: {args}")

4.9 错误处理

流式中断处理

from openai import OpenAI, APIError, APIConnectionError, RateLimitError

client = OpenAI()

def safe_stream_chat(messages: list, model: str = "gpt-4o-mini") -> str:
    """带错误处理的流式调用"""
    collected = []

    try:
        stream = client.chat.completions.create(
            model=model,
            messages=messages,
            stream=True,
            timeout=60.0,  # 流式请求建议较长超时
        )

        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                collected.append(content)
                print(content, end="", flush=True)

            # 检查是否被截断
            if chunk.choices and chunk.choices[0].finish_reason == "length":
                print("\n[警告: 输出被 max_tokens 截断]")

    except APIConnectionError:
        print("\n[错误: 网络连接失败]")
    except RateLimitError:
        print("\n[错误: 请求频率超限,请稍后重试]")
    except APIError as e:
        print(f"\n[API 错误: {e}]")
    except Exception as e:
        print(f"\n[未知错误: {e}]")

    print()
    return "".join(collected)

流式超时控制

import signal

class StreamTimeout(Exception):
    pass

def timeout_handler(signum, frame):
    raise StreamTimeout("流式响应超时")

def stream_with_timeout(messages, timeout_seconds=30):
    """带超时的流式调用"""
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout_seconds)

    try:
        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            stream=True,
        )
        for chunk in stream:
            signal.alarm(timeout_seconds)  # 每收到 chunk 重置计时
            if chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end="", flush=True)
    except StreamTimeout:
        print("\n[超时: 响应中断]")
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

4.10 业务场景

场景推荐方案原因
聊天机器人流式即时反馈,体验好
代码生成流式用户可以边看边思考
文档翻译流式长文本等待感强
数据分析报告非流式一次性渲染更方便
批量 API 调用非流式简化处理逻辑
Agent 工具调用流式实时展示工具调用过程

4.11 注意事项

  1. 不要在流式中打断:避免在流式输出过程中关闭连接
  2. Nginx 代理配置:需添加 proxy_buffering off;
  3. CDN/负载均衡:确保不会缓冲 SSE 消息
  4. 内存管理:长时间对话注意累积的响应文本
  5. 并发处理:同一用户的多个流式请求需要区分

4.12 扩展阅读


下一章05 - 视觉理解 API — 图片输入、多模态对话、OCR 功能。