第2章:事件循环 —— 异步的心脏
第2章:事件循环 —— 异步的心脏
2.1 什么是事件循环?
事件循环(Event Loop)是异步编程的核心调度机制。它是一个无限循环,不断地:
- 监听事件:哪些 I/O 操作已完成?哪些定时器已到期?
- 分发事件:将就绪的事件交给对应的回调函数处理
- 执行回调:运行注册好的回调代码
┌───────────────────────────────────┐
│ Event Loop │
│ │
│ ┌───────────┐ │
│ │ poll I/O │ ◄──── 有就绪事件?
│ └─────┬─────┘ 否 → 阻塞等待
│ │ 是 │
│ ▼ │
│ ┌───────────┐ │
│ │ 执行回调 │ │
│ └─────┬─────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ 检查定时器 │ │
│ └─────┬─────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ 回到 poll │ ─────────────► │
│ └───────────┘ │
└───────────────────────────────────┘
为什么事件循环能用单线程处理高并发?
关键在于:大多数 Web 请求的瓶颈是 I/O 等待,而非 CPU 计算。
假设一个请求的生命周期中:
- 95% 的时间在等待数据库查询、网络调用等 I/O
- 5% 的时间在执行业务逻辑
单线程 + 事件循环可以在等待 I/O 的 95% 时间里去处理其他请求,从而实现高并发。
2.2 Reactor 模式
Reactor 模式是事件驱动架构的经典设计,由 Doug Schmidt 在 1995 年提出。
核心组件
| 组件 | 职责 | 类比 |
|---|---|---|
| Handle(句柄) | OS 资源标识(socket fd) | 餐桌号码 |
| Event Demultiplexer(事件多路复用器) | 监听多个 Handle 的事件(epoll/kqueue) | 门口的叫号机 |
| Reactor | 事件循环的核心,分发事件 | 餐厅经理 |
| EventHandler(事件处理器) | 处理特定事件的回调 | 厨师/服务员 |
Reactor 的变体
| 变体 | 描述 | 适用场景 |
|---|---|---|
| 单线程 Reactor | 所有 I/O 和业务逻辑在同一线程 | 简单的代理服务器 |
| 多线程 Reactor | 一个 Reactor 线程 + 工作线程池 | Nginx |
| 主从 Reactor | 主 Reactor 接受连接,子 Reactor 处理 I/O | Netty、Redis 6.0+ |
伪代码实现
class Reactor:
def __init__(self):
self.handlers = {} # fd -> handler 的映射
self.running = False
def register(self, fd, event, handler):
"""注册事件监听"""
self.handlers[(fd, event)] = handler
def run(self):
"""事件循环主循环"""
self.running = True
while self.running:
# 阻塞等待就绪事件
events = epoll_wait(self.handlers.keys(), timeout=-1)
for fd, event in events:
handler = self.handlers.get((fd, event))
if handler:
handler.handle_event(fd, event)
使用示例:
reactor = Reactor()
def on_readable(fd):
data = fd.read()
process_request(data)
reactor.register(socket_fd, EVENT_READ, on_readable)
reactor.run()
2.3 Proactor 模式
Proactor 模式是 Reactor 的"升级版",它把 I/O 操作本身也交给操作系统完成。
Reactor vs Proactor
| 特性 | Reactor | Proactor |
|---|---|---|
| I/O 操作 | 应用程序执行 read/write | 操作系统执行 read/write |
| 通知时机 | I/O 就绪时通知 | I/O 完成时通知 |
| 编程模型 | “准备好了,你来读” | “读完了,数据在这” |
| 代表实现 | epoll、kqueue | IOCP (Windows)、io_uring (Linux) |
| 编程复杂度 | 较高(需处理部分读写) | 较低(一次操作一次回调) |
Proactor 工作流
应用程序 操作系统
│ │
│ ① 提交异步读请求 │
│ ─────────────────────► │
│ │ ② 内核执行实际 I/O
│ ③ 应用去做别的事 │
│ │
│ │ ④ I/O 完成
│ ⑤ 通知应用程序 │
│ ◄───────────────────── │
│ │
│ ⑥ 处理已完成的数据 │
│ │
2.4 Linux epoll 深入
epoll 是 Linux 下最核心的 I/O 多路复用机制,是理解 Linux 异步编程的基石。
核心 API
// 创建 epoll 实例
int epoll_create1(int flags);
// 添加/修改/删除监听的文件描述符
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待事件就绪
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
水平触发(LT)vs 边缘触发(ET)
| 模式 | 行为 | 优点 | 缺点 |
|---|---|---|---|
| 水平触发(LT) | 只要 fd 可读/可写就持续通知 | 编程简单,不易丢事件 | 可能重复通知,性能略低 |
| 边缘触发(ET) | 仅在状态变化时通知一次 | 性能高,通知次数少 | 必须一次性读完,否则可能丢事件 |
注意:大部分框架(libuv、Tokio)使用 LT 模式。ET 模式需要配合非阻塞 I/O,一次读到
EAGAIN为止。
完整示例:简易 TCP Echo 服务器
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#define MAX_EVENTS 1024
#define BUF_SIZE 4096
static void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int main() {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
set_nonblocking(listen_fd);
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(8080),
.sin_addr.s_addr = INADDR_ANY
};
bind(listen_fd, (struct sockaddr*)&addr, sizeof(addr));
listen(listen_fd, SOMAXCONN);
int epfd = epoll_create1(0);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = listen_fd };
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);
struct epoll_event events[MAX_EVENTS];
char buf[BUF_SIZE];
while (1) {
int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == listen_fd) {
// 新连接到达
int client_fd = accept(listen_fd, NULL, NULL);
set_nonblocking(client_fd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
} else {
// 数据可读
int fd = events[i].data.fd;
ssize_t n = read(fd, buf, BUF_SIZE);
if (n <= 0) {
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
} else {
write(fd, buf, n); // echo
}
}
}
}
}
2.5 macOS kqueue 与 Windows IOCP
kqueue(macOS / FreeBSD)
int kq = kqueue();
struct kevent changes[1];
EV_SET(&changes[0], sockfd, EVFILT_READ, EV_ADD, 0, 0, NULL);
struct kevent events[10];
int nev = kevent(kq, changes, 1, events, 10, NULL);
for (int i = 0; i < nev; i++) {
if (events[i].filter == EVFILT_READ) {
handle_read(events[i].ident);
}
}
IOCP(Windows)
IOCP 是 Proactor 模型的典型实现:
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// 将 socket 关联到 IOCP
CreateIoCompletionPort((HANDLE)socket, iocp, completion_key, 0);
// 提交异步 I/O 请求
WSABUF buf = { .len = 4096, .buf = buffer };
WSARecv(socket, &buf, 1, NULL, &flags, &overlapped, NULL);
// 等待 I/O 完成
DWORD bytes;
ULONG_PTR key;
OVERLAPPED *ov;
GetQueuedCompletionStatus(iocp, &bytes, &key, &ov, INFINITE);
// bytes 是实际读取的字节数,数据在 buffer 中
跨平台对比表
| 特性 | epoll (Linux) | kqueue (macOS/BSD) | IOCP (Windows) |
|---|---|---|---|
| 模型 | Reactor | Reactor | Proactor |
| 事件类型 | 读/写/错误 | 读/写/信号/定时器/文件 | 读/写/接受连接 |
| 边缘触发 | 支持(EPOLLET) | 支持(EV_CLEAR) | N/A(完成通知) |
| 线程安全 | 需要锁 | 需要锁 | 内置线程池 |
| 代表框架 | Tokio、libuv | libuv、kqueue | IOCP、Boost.Asio |
2.6 libuv:Node.js 的引擎
libuv 是一个跨平台的异步 I/O 库,是 Node.js 的底层引擎。
libuv 架构
┌─────────────────────────────────────────────┐
│ JavaScript │
│ (你的 Node.js 应用代码) │
├─────────────────────────────────────────────┤
│ Node.js Core │
│ (V8 + C++ 绑定层) │
├─────────────────────────────────────────────┤
│ libuv │
│ ┌─────────┐ ┌──────────┐ ┌───────────┐ │
│ │Event Loop│ │Thread Pool│ │ Platform │ │
│ │ (主线程) │ │(工作线程) │ │ (I/O后端) │ │
│ └─────────┘ └──────────┘ └───────────┘ │
├─────────────────────────────────────────────┤
│ epoll / kqueue / IOCP / poll / select │
└─────────────────────────────────────────────┘
libuv 事件循环阶段
| 阶段 | 职责 | 示例 |
|---|---|---|
| timers | 执行到期的定时器回调 | setTimeout() |
| pending callbacks | 执行延迟到下一轮循环的 I/O 回调 | TCP 错误 |
| idle, prepare | 内部使用 | - |
| poll | 执行 I/O 回调(核心阶段) | fs.readFile() |
| check | 执行 setImmediate() 回调 | setImmediate() |
| close callbacks | 执行关闭事件的回调 | socket.on('close') |
Node.js 中的事件循环
const fs = require('fs');
// Phase 1: 进入事件循环
console.log('1. Start');
// Phase 2: I/O 回调(poll 阶段执行)
fs.readFile(__filename, () => {
console.log('4. I/O callback');
// 微任务(在当前阶段之后立即执行)
Promise.resolve().then(() => console.log('5. Microtask'));
// 宏任务(下一个阶段执行)
setImmediate(() => console.log('6. setImmediate'));
setTimeout(() => console.log('7. setTimeout'), 0);
});
// Phase 3: 微任务
Promise.resolve().then(() => console.log('3. Microtask'));
// Phase 4: 继续执行
console.log('2. End');
// 输出顺序:
// 1. Start
// 2. End
// 3. Microtask
// 4. I/O callback
// 5. Microtask
// 6. setImmediate
// 7. setTimeout
2.7 io_uring:Linux 异步 I/O 的未来
io_uring 是 Linux 5.1(2019)引入的新一代异步 I/O 接口,旨在解决传统 AIO 的局限性。
核心设计
┌─────────────┐ ┌─────────────┐
│ 应用程序 │ │ 内核 │
│ │ │ │
│ ┌───────┐ │ 共享内存 │ ┌───────┐ │
│ │ SQ │──┼──────────┼──│ SQ │ │
│ │(提交队列)│ │ │ │(内核读) │ │
│ └───────┘ │ │ └───────┘ │
│ │ │ │
│ ┌───────┐ │ 共享内存 │ ┌───────┐ │
│ │ CQ │◄─┼──────────┼──│ CQ │ │
│ │(完成队列)│ │ │ │(内核写) │ │
│ └───────┘ │ │ └───────┘ │
└─────────────┘ └─────────────┘
- SQ(Submission Queue):应用提交 I/O 请求
- CQ(Completion Queue):内核写入完成结果
- 无需系统调用:通过 mmap 共享内存,减少上下文切换
性能对比
| 指标 | epoll + read/write | io_uring |
|---|---|---|
| 系统调用次数 | 每次 I/O 两次(epoll_wait + read) | 批量提交,可为零 |
| 内核上下文切换 | 每次 I/O 至少一次 | 大幅减少 |
| 吞吐量(QPS) | 基准 | 提升 30%-100% |
| 适用场景 | 通用 | 高吞吐网络/存储 |
2.8 业务场景:高并发 Web 服务器设计
场景描述
一个电商 API 网关,需要同时处理 50,000 个长连接(WebSocket),每个连接的请求延迟 50-200ms。
架构选型
| 方案 | 线程数 | 内存 | QPS | 复杂度 |
|---|---|---|---|---|
| Thread-per-Connection | 50,000 | ~50GB | ~10,000 | 低 |
| 线程池(200 线程) | 200 | ~200MB | ~5,000 | 低 |
| 事件循环 + 协程 | ~CPU 核数 | ~500MB | ~100,000 | 中 |
| 多进程 + 事件循环 | CPU 核数 × 2 | ~1GB | ~200,000 | 高 |
推荐方案:多进程(Worker)+ 每进程一个事件循环 + 协程
┌──────────────────┐
│ Master 进程 │
│ (负载均衡) │
└────────┬─────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Worker 0 │ │Worker 1 │ │Worker 2 │
│ Event │ │ Event │ │ Event │
│ Loop │ │ Loop │ │ Loop │
│+ 协程 │ │+ 协程 │ │+ 协程 │
└─────────┘ └─────────┘ └─────────┘
2.9 本章小结
| 要点 | 说明 |
|---|---|
| 事件循环 | 单线程处理高并发的核心机制 |
| Reactor | I/O 就绪时通知(epoll、kqueue) |
| Proactor | I/O 完成时通知(IOCP、io_uring) |
| LT vs ET | 水平触发简单可靠,边缘触发性能高 |
| libuv | 跨平台事件循环库,Node.js 引擎 |
| io_uring | Linux 异步 I/O 的未来,零系统调用 |
下一章预告:了解了事件循环之后,我们将学习最早的异步编程方式——回调函数(Callback),以及它带来的"回调地狱"问题。
扩展阅读
- libuv 官方文档 — Node.js 的异步引擎
- io_uring: A Deep Dive — Jens Axboe 的论文
- The Art of Writing Efficient Programs — Chapter 8
- Reactor: An Object Behavioral Pattern — Doug Schmidt 的原始论文
- The C10M Problem — 千万级并发的挑战