POSIX 标准详解教程 / 第六章:I/O 模型
第六章:I/O 模型
掌握 POSIX I/O 模型:阻塞/非阻塞 I/O、read/write、select/poll/epoll、多路复用。
6.1 POSIX I/O 概述
6.1.1 一切皆文件描述符
POSIX 将所有 I/O 统一为文件描述符(File Descriptor, fd) 的读写操作。无论底层是磁盘文件、终端、管道还是网络套接字,都使用同一套 API:
open() → read() / write() / lseek() → close()
socket() → recv() / send() / close()
6.1.2 标准文件描述符
| fd | 名称 | 宏 | 默认关联 |
|---|
| 0 | 标准输入 | STDIN_FILENO | 键盘 |
| 1 | 标准输出 | STDOUT_FILENO | 终端 |
| 2 | 标准错误 | STDERR_FILENO | 终端 |
6.1.3 I/O 模型分类
POSIX I/O 模型
├── 阻塞 I/O (Blocking I/O)
│ └── read()/write() 在数据未就绪时挂起进程
├── 非阻塞 I/O (Non-blocking I/O)
│ └── read()/write() 立即返回 EAGAIN/EWOULDBLOCK
├── I/O 多路复用 (I/O Multiplexing)
│ ├── select() ← POSIX 标准
│ ├── poll() ← POSIX 标准
│ └── epoll ← Linux 扩展
└── 异步 I/O (Asynchronous I/O)
└── aio_read()/aio_write() ← POSIX AIO
6.2 基本 I/O 操作
6.2.1 read() 与 write()
/*
* basic_io.c - 基本 read/write 操作
* 编译: gcc -Wall -o basic_io basic_io.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
/* 安全读取:处理 EINTR 和部分读取 */
static ssize_t safe_read(int fd, void *buf, size_t count)
{
ssize_t total = 0;
while ((size_t)total < count) {
ssize_t n = read(fd, (char *)buf + total, count - total);
if (n == -1) {
if (errno == EINTR) continue; /* 被信号中断,重试 */
return -1; /* 其他错误 */
}
if (n == 0) break; /* EOF */
total += n;
}
return total;
}
/* 安全写入:处理 EINTR 和部分写入 */
static ssize_t safe_write(int fd, const void *buf, size_t count)
{
ssize_t total = 0;
while ((size_t)total < count) {
ssize_t n = write(fd, (const char *)buf + total, count - total);
if (n == -1) {
if (errno == EINTR) continue;
return -1;
}
total += n;
}
return total;
}
int main(void)
{
const char *path = "/tmp/posix_io_test.txt";
/* 创建文件 */
int fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd == -1) { perror("open"); return 1; }
const char *data = "Hello, POSIX I/O!\nThis is line 2.\n";
ssize_t written = safe_write(fd, data, strlen(data));
printf("写入 %zd 字节\n", written);
close(fd);
/* 读取文件 */
fd = open(path, O_RDONLY);
if (fd == -1) { perror("open"); return 1; }
char buf[256];
ssize_t nread = safe_read(fd, buf, sizeof(buf) - 1);
if (nread > 0) {
buf[nread] = '\0';
printf("读取 %zd 字节:\n%s", nread, buf);
}
close(fd);
unlink(path);
return 0;
}
6.2.2 read() 返回值含义
| 返回值 | 含义 |
|---|
> 0 | 成功读取的字节数 |
= 0 | EOF(文件结束或连接关闭) |
-1 | 错误(检查 errno) |
注意:read() 可能返回少于请求的字节数(部分读取),不一定意味着 EOF。需要循环读取直到获得所需字节数或遇到 EOF。
6.2.3 分散/聚集 I/O (Scatter/Gather)
/*
* scatter_gather.c - 使用 readv/writev 进行分散/聚集 I/O
* 编译: gcc -Wall -o scatter_gather scatter_gather.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <unistd.h>
#include <string.h>
int main(void)
{
const char *path = "/tmp/posix_iovec_test";
/* 准备多个缓冲区 */
const char *header = "HEADER:";
const char *body = "Hello, scatter/gather I/O!";
const char *footer = ":FOOTER\n";
struct iovec iov_w[3] = {
{ .iov_base = (void *)header, .iov_len = strlen(header) },
{ .iov_base = (void *)body, .iov_len = strlen(body) },
{ .iov_base = (void *)footer, .iov_len = strlen(footer) },
};
/* 写入:一次系统调用写入多个缓冲区 */
int fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
ssize_t nw = writev(fd, iov_w, 3);
printf("writev 写入 %zd 字节\n", nw);
close(fd);
/* 读取:一次系统调用读入多个缓冲区 */
char hbuf[16], bbuf[128], fbuf[16];
struct iovec iov_r[3] = {
{ .iov_base = hbuf, .iov_len = sizeof(hbuf) - 1 },
{ .iov_base = bbuf, .iov_len = sizeof(bbuf) - 1 },
{ .iov_base = fbuf, .iov_len = sizeof(fbuf) - 1 },
};
fd = open(path, O_RDONLY);
ssize_t nr = readv(fd, iov_r, 3);
hbuf[iov_r[0].iov_len < sizeof(hbuf) - 1 ? iov_r[0].iov_len : sizeof(hbuf) - 1] = '\0';
bbuf[iov_r[1].iov_len < sizeof(bbuf) - 1 ? iov_r[1].iov_len : sizeof(bbuf) - 1] = '\0';
fbuf[iov_r[2].iov_len < sizeof(fbuf) - 1 ? iov_r[2].iov_len : sizeof(fbuf) - 1] = '\0';
printf("readv 读取 %zd 字节\n", nr);
printf("header: '%s'\n", hbuf);
printf("body: '%s'\n", bbuf);
printf("footer: '%s'\n", fbuf);
close(fd);
unlink(path);
return 0;
}
6.3 阻塞 vs 非阻塞 I/O
6.3.1 模型对比
| 特性 | 阻塞 I/O | 非阻塞 I/O |
|---|
| 无数据时 | 进程挂起等待 | 立即返回 EAGAIN |
| 编程复杂度 | 简单 | 复杂(需轮询) |
| CPU 利用率 | 低(等待时 CPU 空闲) | 高(忙等待) |
| 设置方式 | 默认 | O_NONBLOCK 标志 |
6.3.2 非阻塞 I/O 示例
/*
* nonblocking_io.c - 非阻塞 I/O 演示
* 编译: gcc -Wall -o nonblocking_io nonblocking_io.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
int main(void)
{
int pipefd[2];
pipe(pipefd);
/* 设置读端为非阻塞 */
int flags = fcntl(pipefd[0], F_GETFL);
fcntl(pipefd[0], F_SETFL, flags | O_NONBLOCK);
/* 尝试从空管道读取 */
char buf[64];
ssize_t n = read(pipefd[0], buf, sizeof(buf));
if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("管道无数据,read() 立即返回 EAGAIN\n");
} else {
perror("read");
}
}
/* 写入数据 */
write(pipefd[1], "hello", 5);
/* 再次读取 */
n = read(pipefd[0], buf, sizeof(buf));
if (n > 0) {
buf[n] = '\0';
printf("读取到数据: '%s' (%zd 字节)\n", buf, n);
}
close(pipefd[0]);
close(pipefd[1]);
return 0;
}
6.4 select():POSIX 标准多路复用
6.4.1 select() 接口
int select(int nfds,
fd_set *readfds, /* 可读 fd 集合 */
fd_set *writefds, /* 可写 fd 集合 */
fd_set *exceptfds, /* 异常 fd 集合 */
struct timeval *timeout);
| 参数 | 说明 |
|---|
nfds | 最大 fd 值 + 1 |
readfds | 监控可读性(NULL 不监控) |
writefds | 监控可写性(NULL 不监控) |
exceptfds | 监控异常(NULL 不监控) |
timeout | 超时(NULL = 永久阻塞,0 = 立即返回) |
6.4.2 select() 示例
/*
* select_demo.c - 使用 select() 监控多个文件描述符
* 编译: gcc -Wall -o select_demo select_demo.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <string.h>
int main(void)
{
int pipe1[2], pipe2[2];
pipe(pipe1);
pipe(pipe2);
/* 子进程 1:向 pipe1 写入 */
if (fork() == 0) {
close(pipe1[0]);
sleep(1);
write(pipe1[1], "Data from child 1\n", 18);
close(pipe1[1]);
_exit(0);
}
/* 子进程 2:向 pipe2 写入 */
if (fork() == 0) {
close(pipe2[0]);
sleep(2);
write(pipe2[1], "Data from child 2\n", 18);
close(pipe2[1]);
_exit(0);
}
close(pipe1[1]);
close(pipe2[1]);
/* 父进程:使用 select() 同时监控两个管道 */
fd_set rfds;
int maxfd = (pipe1[0] > pipe2[0] ? pipe1[0] : pipe2[0]) + 1;
int pipes_alive = 2;
while (pipes_alive > 0) {
FD_ZERO(&rfds);
FD_SET(pipe1[0], &rfds);
FD_SET(pipe2[0], &rfds);
struct timeval tv = { .tv_sec = 5, .tv_usec = 0 };
int ready = select(maxfd, &rfds, NULL, NULL, &tv);
if (ready == -1) {
perror("select");
break;
}
if (ready == 0) {
printf("超时\n");
break;
}
char buf[128];
if (FD_ISSET(pipe1[0], &rfds)) {
ssize_t n = read(pipe1[0], buf, sizeof(buf) - 1);
if (n > 0) { buf[n] = '\0'; printf("[pipe1] %s", buf); }
else { close(pipe1[0]); pipes_alive--; }
}
if (FD_ISSET(pipe2[0], &rfds)) {
ssize_t n = read(pipe2[0], buf, sizeof(buf) - 1);
if (n > 0) { buf[n] = '\0'; printf("[pipe2] %s", buf); }
else { close(pipe2[0]); pipes_alive--; }
}
}
close(pipe1[0]);
close(pipe2[0]);
return 0;
}
6.4.3 select() 的局限性
| 限制 | 说明 |
|---|
| fd 数量上限 | FD_SETSIZE(通常为 1024) |
| 每次调用需重置 | fd_set 会被内核修改,需要重新设置 |
| O(n) 扫描 | 需要遍历整个 fd_set 检查就绪 fd |
| 拷贝开销 | 每次调用需将 fd_set 拷贝到内核空间 |
6.5 poll():改进的多路复用
6.5.1 poll() 接口
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 关注的事件 */
short revents; /* 返回的事件 */
};
6.5.2 poll 事件标志
| 事件 | 说明 |
|---|
POLLIN | 有数据可读(含普通数据和优先数据) |
POLLOUT | 可以写入 |
POLLERR | 错误(仅 revents) |
POLLHUP | 挂断(仅 revents) |
POLLNVAL | 无效 fd(仅 revents) |
POLLPRI | 优先数据可读(如 TCP 带外数据) |
6.5.3 poll() 示例
/*
* poll_demo.c - 使用 poll() 监控多个管道
* 编译: gcc -Wall -o poll_demo poll_demo.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <poll.h>
#include <string.h>
#define NUM_PIPES 3
int main(void)
{
int pipes[NUM_PIPES][2];
struct pollfd pfds[NUM_PIPES];
/* 创建管道并设置 poll 监控 */
for (int i = 0; i < NUM_PIPES; i++) {
pipe(pipes[i]);
pfds[i].fd = pipes[i][0]; /* 监控读端 */
pfds[i].events = POLLIN;
pfds[i].revents = 0;
}
/* 子进程:分别向管道写入数据 */
for (int i = 0; i < NUM_PIPES; i++) {
if (fork() == 0) {
close(pipes[i][0]); /* 关闭读端 */
sleep(i + 1);
char msg[64];
int len = snprintf(msg, sizeof(msg), "Data from pipe %d\n", i);
write(pipes[i][1], msg, len);
close(pipes[i][1]);
_exit(0);
}
close(pipes[i][1]); /* 父进程关闭写端 */
}
/* 父进程:使用 poll() 等待数据 */
int active = NUM_PIPES;
while (active > 0) {
int ready = poll(pfds, NUM_PIPES, 5000);
if (ready == -1) { perror("poll"); break; }
if (ready == 0) { printf("超时\n"); break; }
for (int i = 0; i < NUM_PIPES; i++) {
if (pfds[i].revents & POLLIN) {
char buf[128];
ssize_t n = read(pfds[i].fd, buf, sizeof(buf) - 1);
if (n > 0) {
buf[n] = '\0';
printf("[pipe %d] %s", i, buf);
}
if (n <= 0) {
close(pfds[i].fd);
pfds[i].fd = -1;
active--;
}
}
}
}
printf("所有管道数据读取完毕\n");
return 0;
}
6.5.4 select() vs poll() 对比
| 对比项 | select() | poll() |
|---|
| fd 数量限制 | FD_SETSIZE (1024) | 无硬限制 |
| 事件模型 | 可读/可写/异常分离集合 | 统一 events/revents |
| 精度 | 微秒(timeval) | 毫秒 |
| 参数修改 | 每次需重置 fd_set | events 不变,revents 由内核设置 |
| 效率 | O(n) | O(n) |
| 可移植性 | 更广泛 | POSIX.1-2001 |
6.6 epoll:Linux 高性能 I/O 多路复用
6.6.1 epoll 的优势
注意:epoll 是 Linux 专有扩展,不是 POSIX 标准。但在 Linux 高性能编程中至关重要。
| 特性 | select/poll | epoll |
|---|
| 复杂度 | O(n)(每次遍历所有 fd) | O(1)(就绪事件通知) |
| fd 数量 | 受限 | 支持百万级 |
| 内核数据结构 | 每次拷贝 fd_set | 红黑树 + 就绪链表 |
| 触发模式 | 水平触发 (LT) | LT 和边缘触发 (ET) |
6.6.2 epoll 使用模式
/*
* epoll_demo.c - 使用 epoll 监控管道读取
* 编译: gcc -Wall -o epoll_demo epoll_demo.c
* 注意: epoll 是 Linux 扩展,非 POSIX 标准
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <string.h>
#define MAX_EVENTS 10
#define NUM_PIPES 3
int main(void)
{
int pipes[NUM_PIPES][2];
/* 创建管道 */
for (int i = 0; i < NUM_PIPES; i++)
pipe(pipes[i]);
/* 创建 epoll 实例 */
int epfd = epoll_create1(0);
if (epfd == -1) { perror("epoll_create1"); return 1; }
/* 注册管道读端 */
struct epoll_event ev;
for (int i = 0; i < NUM_PIPES; i++) {
ev.events = EPOLLIN;
ev.data.fd = pipes[i][0];
epoll_ctl(epfd, EPOLL_CTL_ADD, pipes[i][0], &ev);
}
/* 子进程写入数据 */
for (int i = 0; i < NUM_PIPES; i++) {
if (fork() == 0) {
close(pipes[i][0]);
sleep(i + 1);
char msg[64];
int len = snprintf(msg, sizeof(msg), "Pipe %d data\n", i);
write(pipes[i][1], msg, len);
close(pipes[i][1]);
_exit(0);
}
close(pipes[i][1]);
}
/* 等待事件 */
struct epoll_event events[MAX_EVENTS];
int active = NUM_PIPES;
while (active > 0) {
int nfds = epoll_wait(epfd, events, MAX_EVENTS, 5000);
if (nfds == -1) { perror("epoll_wait"); break; }
if (nfds == 0) { printf("超时\n"); break; }
for (int i = 0; i < nfds; i++) {
char buf[128];
ssize_t n = read(events[i].data.fd, buf, sizeof(buf) - 1);
if (n > 0) {
buf[n] = '\0';
printf("[epoll] fd=%d: %s", events[i].data.fd, buf);
}
if (n <= 0) {
close(events[i].data.fd);
active--;
}
}
}
close(epfd);
printf("所有数据读取完毕\n");
return 0;
}
6.6.3 触发模式
| 模式 | 说明 | 特点 |
|---|
| LT (Level Triggered) | 水平触发(默认) | 只要 fd 就绪就持续通知,编程简单 |
| ET (Edge Triggered) | 边缘触发 | 仅在状态变化时通知,需一次性读完所有数据 |
/* 边缘触发模式:必须使用非阻塞 fd */
ev.events = EPOLLIN | EPOLLET; /* ET 模式 */
fcntl(fd, F_SETFL, O_NONBLOCK); /* 必须非阻塞 */
/* ET 模式下必须循环读取直到 EAGAIN */
while (1) {
ssize_t n = read(fd, buf, sizeof(buf));
if (n == -1) {
if (errno == EAGAIN) break; /* 数据读完 */
perror("read");
break;
}
if (n == 0) break; /* EOF */
process(buf, n);
}
6.7 标准 I/O (stdio) vs 文件描述符 I/O
| 特性 | 文件描述符 I/O | 标准 I/O (stdio) |
|---|
| 缓冲 | 无缓冲 | 有缓冲(行缓冲/全缓冲) |
| 函数 | read()/write() | fread()/fwrite() |
| 性能 | 系统调用频繁 | 减少系统调用次数 |
| 线程安全 | 需自行同步 | 内置锁(FILE*) |
| 适用场景 | 网络编程、管道、二进制 I/O | 文件处理、行文本处理 |
6.7.1 缓冲模式
| 模式 | 设置函数 | 触发条件 |
|---|
| 无缓冲 | setbuf(stream, NULL) | 每次 I/O 立即调用 read/write |
| 行缓冲 | 默认(终端) | 遇到 \n 时刷新 |
| 全缓冲 | 默认(文件) | 缓冲区满时刷新 |
/* 手动刷新缓冲区 */
fflush(stdout); /* 刷新 stdout 缓冲区 */
setvbuf(stdout, NULL, _IONBF, 0); /* 设置无缓冲 */
6.8 业务场景:并发回声服务器
/*
* echo_server.c - 使用 poll() 的并发回声服务器
* 编译: gcc -Wall -o echo_server echo_server.c
* 测试: echo "hello" | nc localhost 8080
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <errno.h>
#define PORT 8080
#define MAX_CLIENTS 128
int main(void)
{
/* 创建服务器 socket */
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(PORT),
.sin_addr.s_addr = INADDR_ANY,
};
bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
listen(server_fd, 128);
/* poll 数组 */
struct pollfd fds[MAX_CLIENTS];
int nfds = 1;
fds[0].fd = server_fd;
fds[0].events = POLLIN;
printf("回声服务器启动,端口 %d\n", PORT);
while (1) {
int ready = poll(fds, nfds, -1);
if (ready == -1) { if (errno == EINTR) continue; perror("poll"); break; }
/* 检查新连接 */
if (fds[0].revents & POLLIN) {
int client_fd = accept(server_fd, NULL, NULL);
if (client_fd >= 0 && nfds < MAX_CLIENTS) {
fds[nfds].fd = client_fd;
fds[nfds].events = POLLIN;
printf("新连接 fd=%d (总计 %d)\n", client_fd, nfds);
nfds++;
} else if (client_fd >= 0) {
close(client_fd);
}
}
/* 检查客户端数据 */
for (int i = 1; i < nfds; i++) {
if (fds[i].revents & POLLIN) {
char buf[1024];
ssize_t n = read(fds[i].fd, buf, sizeof(buf));
if (n <= 0) {
printf("断开 fd=%d\n", fds[i].fd);
close(fds[i].fd);
fds[i] = fds[nfds - 1];
nfds--;
i--;
} else {
/* 回声:将数据发回 */
write(fds[i].fd, buf, n);
}
}
}
}
close(server_fd);
return 0;
}
6.9 注意事项
⚠️ 部分读写:read()/write() 可能返回少于请求的字节数。循环调用直到读写完成或遇到错误。
⚠️ EINTR 处理:所有慢速系统调用都可能被信号中断并返回 EINTR。要么使用 SA_RESTART,要么手动重试。
⚠️ select() fd_set 限制:FD_SETSIZE 通常为 1024。超过此限制需要使用 poll() 或 epoll。
⚠️ epoll 不是 POSIX:epoll 是 Linux 专有扩展。跨平台程序应使用 poll() 或封装层(如 libevent、libuv)。
⚠️ 非阻塞 I/O + 边缘触发:ET 模式必须配合非阻塞 fd 使用,且必须循环读取直到 EAGAIN。
6.10 扩展阅读
man 2 read、man 2 write、man 2 poll、man 2 selectman 7 epoll — Linux epoll 详解- APUE 第 14 章:Advanced I/O
- TLPI 第 63 章:Alternative I/O Models
- 《Unix Network Programming》 — W. Richard Stevens 著,网络编程经典
- libevent / libuv:跨平台 I/O 多路复用库
6.11 本章小结
| 要点 | 说明 |
|---|
| 文件描述符 | POSIX I/O 的统一抽象 |
| read/write | 基本 I/O,可能部分读写,可能被信号中断 |
| select() | POSIX 标准,fd 上限 1024,O(n) 扫描 |
| poll() | POSIX 标准,无 fd 上限,仍为 O(n) |
| epoll | Linux 扩展,O(1) 通知,支持 LT/ET |
| 缓冲 I/O | stdio 缓冲减少系统调用,但需注意 fflush |