第 6 章 · 异步编程基础
第 6 章 · 异步编程基础
6.1 为什么需要异步
Node.js 采用单线程模型,如果所有操作都是同步的,一个耗时操作就会阻塞整个程序。异步编程让程序可以在等待 I/O 时继续执行其他代码。
// 同步(阻塞)— 不推荐在服务器中使用
const data = fs.readFileSync('/large-file.txt'); // 阻塞!
console.log('文件读取完成');
processOtherRequests(); // 必须等文件读完
// 异步(非阻塞)— 推荐
fs.readFile('/large-file.txt', (err, data) => {
console.log('文件读取完成');
});
processOtherRequests(); // 立即执行,不等待
6.2 回调函数(Callback)
回调的基本模式
const fs = require('fs');
// 错误优先回调(Error-First Callback)— Node.js 约定
fs.readFile('data.txt', 'utf8', (err, data) => {
if (err) {
console.error('读取失败:', err.message);
return;
}
console.log('文件内容:', data);
});
回调地狱(Callback Hell)
// 嵌套回调 — 可读性差,难以维护
fs.readFile('config.json', 'utf8', (err, configData) => {
if (err) return handleError(err);
const config = JSON.parse(configData);
fs.readFile(config.dataFile, 'utf8', (err, userData) => {
if (err) return handleError(err);
const users = JSON.parse(userData);
fs.writeFile('output.json', JSON.stringify(users), (err) => {
if (err) return handleError(err);
console.log('写入完成');
});
});
});
封装回调为 Promise
const { promisify } = require('node:util');
const fs = require('fs');
// 方式 1:使用 promisify
const readFile = promisify(fs.readFile);
const writeFile = promisify(fs.writeFile);
// 方式 2:使用 fs/promises(Node.js 10+)
const fsPromises = require('fs/promises');
// 方式 3:手动封装
function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
6.3 Promise 详解
创建 Promise
// 基本创建
const promise = new Promise((resolve, reject) => {
// 异步操作
setTimeout(() => {
const success = Math.random() > 0.5;
if (success) {
resolve('操作成功'); // 状态:fulfilled
} else {
reject(new Error('操作失败')); // 状态:rejected
}
}, 1000);
});
// Promise 的三种状态:
// pending → fulfilled(已成功)→ resolve(value)
// pending → rejected(已拒绝)→ reject(error)
// 一旦状态改变,不可再变
Promise 链式调用
readFile('config.json', 'utf8')
.then((data) => JSON.parse(data))
.then((config) => readFile(config.dataFile, 'utf8'))
.then((userData) => {
const users = JSON.parse(userData);
return writeFile('output.json', JSON.stringify(users, null, 2));
})
.then(() => console.log('写入完成'))
.catch((err) => console.error('错误:', err.message))
.finally(() => console.log('无论成功失败都会执行'));
Promise 静态方法
// Promise.all — 全部成功才成功,有一个失败就失败
const results = await Promise.all([
fetch('/api/users'),
fetch('/api/posts'),
fetch('/api/comments'),
]);
// results = [users, posts, comments]
// Promise.allSettled — 等待所有完成(无论成功失败)
const outcomes = await Promise.allSettled([
fetch('/api/might-fail-1'),
fetch('/api/might-fail-2'),
]);
// outcomes = [
// { status: 'fulfilled', value: ... },
// { status: 'rejected', reason: ... }
// ]
// Promise.race — 返回最先完成的结果
const fastest = await Promise.race([
fetch('/api/server1'),
fetch('/api/server2'),
]);
// Promise.any — 返回最先成功的
const winner = await Promise.any([
fetch('/api/server1'),
fetch('/api/server2'),
fetch('/api/server3'),
]);
// Promise.resolve / Promise.reject
const resolved = Promise.resolve(42);
const rejected = Promise.reject(new Error('失败'));
// Promise.try(Node.js 22+)
const result = await Promise.try(() => {
return mightThrowSync();
});
实用工具函数
// 延迟函数
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
// 超时控制
function withTimeout(promise, ms) {
const timeout = new Promise((_, reject) => {
setTimeout(() => reject(new Error('操作超时')), ms);
});
return Promise.race([promise, timeout]);
}
// 重试函数
async function retry(fn, maxRetries = 3, delayMs = 1000) {
for (let i = 0; i <= maxRetries; i++) {
try {
return await fn();
} catch (err) {
if (i === maxRetries) throw err;
console.log(`重试第 ${i + 1} 次...`);
await delay(delayMs * (i + 1)); // 指数退避
}
}
}
// 使用示例
const data = await retry(
() => fetch('https://api.example.com/data'),
3,
1000
);
6.4 async/await
基本语法
// async 函数总是返回 Promise
async function fetchData() {
return 'data'; // 自动包装为 Promise.resolve('data')
}
// await 暂停执行,等待 Promise 完成
async function processData() {
try {
const configData = await readFile('config.json', 'utf8');
const config = JSON.parse(configData);
const userData = await readFile(config.dataFile, 'utf8');
const users = JSON.parse(userData);
await writeFile('output.json', JSON.stringify(users, null, 2));
console.log('处理完成');
} catch (err) {
console.error('错误:', err.message);
}
}
并发控制
// ❌ 顺序执行(慢)
async function sequential() {
const user = await fetch('/api/user/1');
const posts = await fetch('/api/posts');
const comments = await fetch('/api/comments');
// 总耗时 = 请求1 + 请求2 + 请求3
}
// ✅ 并行执行(快)
async function parallel() {
const [user, posts, comments] = await Promise.all([
fetch('/api/user/1'),
fetch('/api/posts'),
fetch('/api/comments'),
]);
// 总耗时 = max(请求1, 请求2, 请求3)
}
// ✅ 有依赖关系时分组并行
async function grouped() {
const user = await fetch('/api/user/1');
// 这两个请求可以并行
const [posts, comments] = await Promise.all([
fetch(`/api/posts?userId=${user.id}`),
fetch(`/api/comments?userId=${user.id}`),
]);
}
顶层 await(Top-Level Await)
// ESM 模块中可以直接使用顶层 await
const response = await fetch('https://api.example.com/data');
const data = await response.json();
console.log(data);
// CJS 中不能使用顶层 await
for 循环中的异步
// ❌ forEach 不会等待 async 函数
async function wrong() {
[1, 2, 3].forEach(async (num) => {
await delay(1000);
console.log(num);
});
console.log('先于 forEach 完成!'); // 会先执行
}
// ✅ 使用 for...of
async function sequentialLoop() {
for (const num of [1, 2, 3]) {
await delay(1000);
console.log(num);
}
console.log('循环完成后执行');
}
// ✅ 使用 Promise.all 并行处理
async function parallelLoop() {
await Promise.all(
[1, 2, 3].map(async (num) => {
await delay(1000);
console.log(num);
})
);
console.log('全部完成');
}
// ✅ 带并发限制的批处理
async function batchProcess(items, batchSize = 5) {
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await Promise.all(batch.map((item) => processItem(item)));
console.log(`已处理 ${Math.min(i + batchSize, items.length)}/${items.length}`);
}
}
6.5 错误处理最佳实践
// ✅ 使用 try/catch 处理 async 错误
async function safeOperation() {
try {
const data = await riskyOperation();
return { success: true, data };
} catch (err) {
return { success: false, error: err.message };
}
}
// ✅ 封装错误处理辅助函数
function to(promise) {
return promise.then(
(data) => [null, data],
(err) => [err, null]
);
}
// 使用方式(类似 Go 风格)
async function fetchData() {
const [err, data] = await to(fetch('/api/data'));
if (err) {
console.error('请求失败:', err.message);
return;
}
console.log('数据:', data);
}
// ✅ 全局未捕获 Promise 拒绝处理
process.on('unhandledRejection', (reason, promise) => {
console.error('未处理的 Promise 拒绝:', reason);
// 记录日志后优雅退出
process.exit(1);
});
6.6 异步迭代器
// 异步生成器
async function* fetchPages(baseUrl, totalPages) {
for (let page = 1; page <= totalPages; page++) {
const response = await fetch(`${baseUrl}?page=${page}`);
const data = await response.json();
yield data;
}
}
// 使用 for await...of 遍历
async function processPages() {
for await (const pageData of fetchPages('/api/items', 10)) {
console.log('处理第', pageData.page, '页');
await saveToDatabase(pageData.items);
}
}
// 可读流也是异步可迭代的
const fs = require('fs');
const readline = require('readline');
async function processLargeFile(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity,
});
let lineCount = 0;
for await (const line of rl) {
lineCount++;
// 逐行处理,不会一次性加载到内存
if (lineCount % 10000 === 0) {
console.log(`已处理 ${lineCount} 行`);
}
}
console.log(`共处理 ${lineCount} 行`);
}
6.7 事件发射器(EventEmitter)
const { EventEmitter } = require('events');
// 创建事件发射器
const emitter = new EventEmitter();
// 注册监听器
emitter.on('data', (chunk) => {
console.log('收到数据:', chunk);
});
emitter.once('end', () => {
console.log('数据接收完毕(只触发一次)');
});
// 触发事件
emitter.emit('data', 'Hello');
emitter.emit('data', 'World');
emitter.emit('end');
// 错误处理 — 始终监听 error 事件
emitter.on('error', (err) => {
console.error('事件错误:', err.message);
});
// 实际应用:创建自定义类
class DataFetcher extends EventEmitter {
async fetch(url) {
this.emit('start', url);
try {
const response = await fetch(url);
const data = await response.json();
this.emit('data', data);
this.emit('end');
return data;
} catch (err) {
this.emit('error', err);
throw err;
}
}
}
const fetcher = new DataFetcher();
fetcher.on('start', (url) => console.log('开始请求:', url));
fetcher.on('data', (data) => console.log('收到数据:', data));
fetcher.on('error', (err) => console.error('错误:', err));
fetcher.fetch('https://api.example.com/data');
注意事项
⚠️ 不要混用回调和 Promise:对同一个操作不要同时使用回调和 Promise,选择一种风格保持一致。
⚠️ 始终处理 Promise 的拒绝:未处理的
Promise.reject会导致UnhandledPromiseRejection,在 Node.js 15+ 中会使进程崩溃。
⚠️ 避免在循环中使用
await而不必要:如果操作之间没有依赖关系,使用Promise.all并行处理。
⚠️ 注意
forEach与 async 的陷阱:forEach不会等待回调中的await,使用for...of或Promise.all(arr.map(...))替代。
业务场景
- API 聚合:使用
Promise.all并行请求多个微服务并聚合结果 - 文件批处理:使用异步迭代器逐行处理大文件,避免内存溢出
- 限流请求:使用
batchProcess控制并发数量,避免打爆下游服务 - 超时控制:使用
Promise.race为网络请求添加超时保护
扩展阅读
上一章:第 5 章 · 模块系统 下一章:第 7 章 · 事件循环 — 深入理解 Node.js 事件循环的机制和阶段。