强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Node.js 开发指南 / 第 8 章 · 流(Streams)

第 8 章 · 流(Streams)

8.1 为什么需要流

处理大文件时,一次性读入内存会导致内存溢出。流允许我们逐块处理数据,内存占用恒定。

// ❌ 一次性读取 — 大文件会导致内存溢出
const data = fs.readFileSync('huge-file.log'); // 可能占用数 GB 内存

// ✅ 使用流 — 内存占用恒定
const stream = fs.createReadStream('huge-file.log');
stream.on('data', (chunk) => {
  // 每次处理一块(默认 64KB)
  process.stdout.write(chunk);
});

流的类型

类型说明示例
Readable可读流,数据源fs.createReadStream, http.IncomingMessage
Writable可写流,数据目的地fs.createWriteStream, http.ServerResponse
Duplex双工流,同时可读可写net.Socket, zlib.createGzip()
Transform转换流,边读边改crypto.createCipher(), zlib.createGzip()
Readable  →  [数据源]  ─────────→
Writable  ←  ───────── [数据目的地]
Duplex    ←→ [既可读又可写] ←→
Transform →  [读取] → [转换] → [输出]

8.2 可读流(Readable)

创建可读流

const fs = require('fs');

// 从文件创建
const readable = fs.createReadStream('data.txt', {
  encoding: 'utf8',
  highWaterMark: 1024, // 每次读取的字节数
});

// 事件驱动模式
readable.on('data', (chunk) => {
  console.log(`收到 ${chunk.length} 字节`);
});

readable.on('end', () => {
  console.log('读取完成');
});

readable.on('error', (err) => {
  console.error('读取错误:', err);
});

流模式

// 流的两种模式
// 1. 流动模式(Flowing)— 数据自动读取
readable.on('data', (chunk) => { /* ... */ });

// 2. 暂停模式(Paused)— 手动调用 read()
readable.on('readable', () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log(`收到 ${chunk.length} 字节`);
  }
});

// 切换模式
readable.pause();  // 暂停
readable.resume(); // 恢复

自定义可读流

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(options) {
    super(options);
    this.current = 0;
    this.max = options.max || 10;
  }

  _read(size) {
    if (this.current <= this.max) {
      this.push(String(this.current++) + '\n');
    } else {
      this.push(null); // 表示流结束
    }
  }
}

const counter = new Counter({ max: 5, encoding: 'utf8' });
counter.pipe(process.stdout);
// 输出: 0\n1\n2\n3\n4\n5\n

异步迭代器方式读取

const fs = require('fs');

async function readFile() {
  const stream = fs.createReadStream('data.txt', { encoding: 'utf8' });
  
  for await (const chunk of stream) {
    console.log(chunk);
  }
  console.log('读取完成');
}

8.3 可写流(Writable)

创建可写流

const fs = require('fs');

const writable = fs.createWriteStream('output.txt');

writable.write('第一行\n');
writable.write('第二行\n');
writable.end('最后一行\n');

writable.on('finish', () => {
  console.log('写入完成');
});

writable.on('error', (err) => {
  console.error('写入错误:', err);
});

背压(Backpressure)处理

const fs = require('fs');

function writeLargeFile() {
  const readable = fs.createReadStream('input.txt');
  const writable = fs.createWriteStream('output.txt');

  readable.on('data', (chunk) => {
    // write() 返回 false 时表示内部缓冲区已满
    const canContinue = writable.write(chunk);
    
    if (!canContinue) {
      // 暂停读取,等待排空
      readable.pause();
    }
  });

  // 缓冲区排空后恢复读取
  writable.on('drain', () => {
    readable.resume();
  });

  readable.on('end', () => {
    writable.end();
  });
}

// ✅ 更简单的方式:使用 pipe
fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'));

自定义可写流

const { Writable } = require('stream');

class Logger extends Writable {
  _write(chunk, encoding, callback) {
    const message = chunk.toString().trim();
    console.log(`[${new Date().toISOString()}] ${message}`);
    callback(); // 必须调用,通知写入完成
  }
}

const logger = new Logger();
logger.write('用户登录\n');
logger.write('请求处理中\n');
logger.end('服务停止\n');

8.4 双工流(Duplex)

const { Duplex } = require('stream');

class Echo extends Duplex {
  constructor(options) {
    super(options);
    this.buffer = [];
  }

  _write(chunk, encoding, callback) {
    this.buffer.push(chunk);
    callback();
  }

  _read(size) {
    if (this.buffer.length > 0) {
      this.push(this.buffer.shift());
    } else {
      this.push(null);
    }
  }
}

8.5 转换流(Transform)

const { Transform } = require('stream');

// 大写转换流
class UpperCase extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// JSON 解析流
class JSONParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
  }

  _transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk.toString());
      this.push(obj);
      callback();
    } catch (err) {
      callback(err);
    }
  }
}

// 使用示例
fs.createReadStream('data.txt')
  .pipe(new UpperCase())
  .pipe(process.stdout);

8.6 管道(Pipeline)

pipe 方法

const { pipeline } = require('stream');
const zlib = require('zlib');
const fs = require('fs');

// 链式 pipe
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'))
  .on('finish', () => console.log('压缩完成'));

pipeline 函数(推荐)

const { pipeline } = require('stream/promises');

// ✅ 使用 pipeline 自动处理错误和清理
async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log('压缩完成');
}

// 使用示例
await compressFile('input.txt', 'input.txt.gz');

// 解压缩
async function decompressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGunzip(),
    fs.createWriteStream(output)
  );
  console.log('解压完成');
}

流式 HTTP 响应

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');

const server = http.createServer(async (req, res) => {
  if (req.url === '/download') {
    const filePath = './large-file.zip';
    
    res.writeHead(200, {
      'Content-Type': 'application/octet-stream',
      'Content-Disposition': 'attachment; filename="file.zip"',
    });

    await pipeline(fs.createReadStream(filePath), res);
    // 如果使用 pipe,需要手动处理错误
  }
});

server.listen(3000);

8.7 objectMode 流

const { Transform, pipeline } = require('stream');
const fs = require('fs');

// 对象模式流 — 处理 JavaScript 对象而非 Buffer
class FilterUsers extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
  }

  _transform(user, encoding, callback) {
    if (user.age >= 18) {
      this.push(user);
    }
    callback();
  }
}

class FormatUser extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
  }

  _transform(user, encoding, callback) {
    this.push(`${user.name} (${user.age}岁)\n`);
    callback();
  }
}

// 使用
const users = [
  { name: 'Alice', age: 30 },
  { name: 'Bob', age: 15 },
  { name: 'Charlie', age: 25 },
];

const { Readable } = require('stream');

Readable.from(users)
  .pipe(new FilterUsers())
  .pipe(new FormatUser())
  .pipe(process.stdout);

// 输出:
// Alice (30岁)
// Charlie (25岁)

注意事项

⚠️ 始终处理错误事件:流如果没有 error 事件监听器,错误会导致进程崩溃。

⚠️ 使用 pipeline 代替 pipepipe 不会自动处理错误和清理资源,推荐使用 stream.pipeline

⚠️ 注意背压:快速的可读流配合慢速的可写流会导致内存溢出,pipe 会自动处理背压。

⚠️ 不要在 Transform 中同步调用 callback:应该异步调用以避免栈溢出。

业务场景

  1. 大文件处理:使用流逐行处理 GB 级日志文件
  2. 文件压缩/解压:使用 zlib 转换流实现流式压缩
  3. 数据管道:CSV → 解析 → 转换 → 数据库的流式 ETL
  4. HTTP 大文件下载:使用流式响应避免内存溢出

扩展阅读


上一章第 7 章 · 事件循环 下一章第 9 章 · Buffer 与二进制数据 — Buffer 操作、编码和二进制数据处理。