强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 12 章:分块传输与流式响应

第 12 章:分块传输与流式响应

分块传输(Chunked Transfer)允许在不知道响应总大小的情况下逐步发送数据,是实现流式响应、实时推送的基础。


12.1 分块传输概述

什么时候需要分块传输

场景说明
动态生成内容服务端边生成边发送
大文件传输避免一次性加载到内存
实时数据流SSE、日志流
代理转发边接收边转发

Content-Length vs Chunked

# 方式 1:已知长度
HTTP/1.1 200 OK
Content-Length: 1234
Content-Type: application/json

{"data": "..."}
# 方式 2:分块传输(长度未知)
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Content-Type: application/json

5\r\n
Hello\r\n
6\r\n
 World\r\n
0\r\n
\r\n

12.2 Chunked Transfer 编码

格式

每个数据块:
<块大小的十六进制>\r\n
<块数据>\r\n

结束块:
0\r\n
\r\n

实际示例

HTTP/1.1 200 OK
Transfer-Encoding: chunked
Content-Type: text/plain

7\r\n
Mozilla\r\n
9\r\n
Developer\r\n
7\r\n
Network\r\n
0\r\n
\r\n

解码后:MozillaDeveloperNetwork

Node.js 分块传输

const http = require('http');

const server = http.createServer((req, res) => {
    if (req.url === '/stream') {
        res.writeHead(200, {
            'Content-Type': 'text/plain',
            'Transfer-Encoding': 'chunked'
        });
        
        // 逐块发送
        let count = 0;
        const interval = setInterval(() => {
            if (count >= 5) {
                res.end();  // 发送结束块
                clearInterval(interval);
                return;
            }
            res.write(`数据块 ${count + 1}\n`);
            count++;
        }, 1000);
    }
});

server.listen(3000);
# 测试分块传输
curl -N http://localhost:3000/stream
# 输出(逐步显示):
# 数据块 1
# 数据块 2
# 数据块 3
# 数据块 4
# 数据块 5

12.3 Trailer 头部

Trailer 允许在响应体结束后发送额外的头部字段。

HTTP/1.1 200 OK
Transfer-Encoding: chunked
Trailer: X-Checksum

5\r\n
Hello\r\n
6\r\n
 World\r\n
0\r\n
X-Checksum: abc123\r\n
\r\n

Node.js Trailer 示例

const http = require('http');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
    res.writeHead(200, {
        'Content-Type': 'text/plain',
        'Transfer-Encoding': 'chunked',
        'Trailer': 'X-Content-Checksum'
    });
    
    const hash = crypto.createHash('md5');
    const chunks = ['Hello', ' ', 'World'];
    
    chunks.forEach(chunk => {
        hash.update(chunk);
        res.write(chunk);
    });
    
    // 在 end 中设置 Trailer
    res.addTrailers({
        'X-Content-Checksum': hash.digest('hex')
    });
    res.end();
});

server.listen(3000);

12.4 Server-Sent Events (SSE)

SSE 是 HTML5 提供的服务器向客户端推送技术。

特点

特性说明
方向服务器 → 客户端(单向)
协议基于 HTTP
格式纯文本,text/event-stream
自动重连浏览器自动重连
事件 ID支持断点续传

事件格式

event: message\n
id: 1\n
retry: 5000\n
data: {"user": "alice", "action": "login"}\n
\n
字段说明
event事件类型(默认 message
id事件 ID(用于断点续传)
retry重连间隔(毫秒)
data事件数据(多行用多个 data:

服务端实现

const express = require('express');
const app = express();

app.get('/events', (req, res) => {
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'X-Accel-Buffering': 'no'  // 禁用 Nginx 缓冲
    });
    
    let eventId = 0;
    
    // 发送事件
    const sendEvent = (data, eventType = 'message') => {
        eventId++;
        if (eventType !== 'message') {
            res.write(`event: ${eventType}\n`);
        }
        res.write(`id: ${eventId}\n`);
        res.write(`data: ${JSON.stringify(data)}\n\n`);
    };
    
    // 初始连接
    sendEvent({ message: '连接成功' });
    
    // 定时推送
    const interval = setInterval(() => {
        sendEvent({
            time: new Date().toISOString(),
            value: Math.random() * 100
        }, 'update');
    }, 3000);
    
    // 心跳(防止连接断开)
    const heartbeat = setInterval(() => {
        res.write(': heartbeat\n\n');
    }, 15000);
    
    // 客户端断开连接
    req.on('close', () => {
        clearInterval(interval);
        clearInterval(heartbeat);
        console.log('客户端断开连接');
    });
});

app.listen(3000);

客户端实现

<!DOCTYPE html>
<html>
<body>
    <div id="output"></div>
    <script>
        const output = document.getElementById('output');
        
        const eventSource = new EventSource('/events');
        
        // 默认消息
        eventSource.onmessage = (event) => {
            const data = JSON.parse(event.data);
            output.innerHTML += `<p>消息: ${data.message}</p>`;
        };
        
        // 自定义事件
        eventSource.addEventListener('update', (event) => {
            const data = JSON.parse(event.data);
            output.innerHTML += `<p>更新: ${data.time} - ${data.value.toFixed(2)}</p>`;
        });
        
        // 连接打开
        eventSource.onopen = () => {
            console.log('连接已建立');
        };
        
        // 错误处理
        eventSource.onerror = (error) => {
            console.error('SSE 错误:', error);
            // 浏览器会自动重连
        };
        
        // 使用 Last-Event-ID 断点续传
        // 浏览器自动在重连时发送 Last-Event-ID 头
        
        // 关闭连接
        // eventSource.close();
    </script>
</body>
</html>

Python 服务端示例

from flask import Flask, Response
import json
import time

app = Flask(__name__)

@app.route('/events')
def events():
    def generate():
        event_id = 0
        while True:
            event_id += 1
            data = {
                'time': time.strftime('%Y-%m-%d %H:%M:%S'),
                'value': event_id * 10
            }
            yield f"id: {event_id}\n"
            yield f"event: update\n"
            yield f"data: {json.dumps(data)}\n\n"
            time.sleep(3)
    
    return Response(
        generate(),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'X-Accel-Buffering': 'no'
        }
    )

12.5 SSE vs WebSocket

特性SSEWebSocket
方向服务器 → 客户端双向
协议HTTP独立协议 (ws://)
自动重连内置需手动实现
数据格式文本文本 + 二进制
代理兼容需特殊配置
适用场景通知、数据推送聊天、游戏

12.6 业务场景:实时股票行情

// 股票行情 SSE 推送
const express = require('express');
const app = express();

// 订阅管理
const subscribers = new Map();

app.get('/stocks/stream', (req, res) => {
    const symbols = req.query.symbols?.split(',') || ['AAPL'];
    
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });
    
    // 初始化订阅
    symbols.forEach(symbol => {
        res.write(`event: subscribe\n`);
        res.write(`data: {"symbol": "${symbol}", "status": "subscribed"}\n\n`);
    });
    
    // 模拟行情更新
    const interval = setInterval(() => {
        symbols.forEach(symbol => {
            const price = (Math.random() * 100 + 100).toFixed(2);
            const change = (Math.random() * 10 - 5).toFixed(2);
            
            res.write(`event: quote\n`);
            res.write(`data: ${JSON.stringify({
                symbol,
                price: parseFloat(price),
                change: parseFloat(change),
                time: new Date().toISOString()
            })}\n\n`);
        });
    }, 2000);
    
    req.on('close', () => {
        clearInterval(interval);
    });
});

app.listen(3000);
<!-- 客户端 -->
<script>
const eventSource = new EventSource('/stocks/stream?symbols=AAPL,GOOGL,MSFT');

eventSource.addEventListener('quote', (event) => {
    const quote = JSON.parse(event.data);
    const el = document.getElementById(`stock-${quote.symbol}`);
    if (el) {
        el.querySelector('.price').textContent = quote.price;
        el.querySelector('.change').textContent = quote.change;
        el.querySelector('.change').className = 
            `change ${quote.change >= 0 ? 'up' : 'down'}`;
    }
});
</script>

12.7 流式 JSON 响应

// 流式返回大量数据
app.get('/api/large-dataset', (req, res) => {
    res.writeHead(200, {
        'Content-Type': 'application/json',
        'Transfer-Encoding': 'chunked'
    });
    
    res.write('[');
    
    let first = true;
    const stream = db.queryStream('SELECT * FROM large_table');
    
    stream.on('data', (row) => {
        if (!first) res.write(',');
        res.write(JSON.stringify(row));
        first = false;
    });
    
    stream.on('end', () => {
        res.end(']');
    });
    
    stream.on('error', (err) => {
        res.end();
    });
});

⚠️ 注意事项

  1. SSE 不能发送二进制数据:需要先 Base64 编码
  2. 代理缓冲:确保代理(Nginx)禁用缓冲 X-Accel-Buffering: no
  3. 连接数限制:浏览器对同一域名的 SSE 连接数有限制(通常 6 个)
  4. 心跳保活:定期发送注释行 :heartbeat\n\n 防止连接超时
  5. 错误处理:客户端需要处理 onerror 事件,浏览器默认 3 秒后重连

🔗 扩展阅读


下一章第 13 章:WebSocket 协议 — 握手流程、帧格式、心跳保活、断线重连