第 19 章:并发与异步编程
第 19 章:并发与异步编程
“并发是现代应用的必备能力”
Perl 支持多种并发模型:多线程、多进程、事件驱动和协程。本章介绍各种方案及其适用场景。
19.1 并发模型概览
| 模型 | 模块 | 特点 | 适用场景 |
|---|---|---|---|
| 多线程 | threads | 共享内存,轻量 | CPU 密集型 |
| 多进程 | fork | 独立内存,稳定 | 任务隔离 |
| 事件驱动 | IO::Async / AnyEvent | 单线程非阻塞 | I/O 密集型 |
| 协程 | Coro | 轻量级切换 | 高并发连接 |
| 预分叉 | Parallel::Prefork | 进程池 | Web 服务器 |
19.2 fork — 多进程
use strict;
use warnings;
use POSIX ":sys_wait_h";
# 基本 fork
my $pid = fork();
if (!defined $pid) {
die "fork 失败: $!\n";
} elsif ($pid == 0) {
# 子进程
print "子进程 PID: $$\n";
sleep 2;
print "子进程完成\n";
exit(0);
} else {
# 父进程
print "父进程 PID: $$, 子进程 PID: $pid\n";
waitpid($pid, 0);
print "子进程已退出\n";
}
管道通信
use strict;
use warnings;
# 创建管道
pipe(my $reader, my $writer);
my $pid = fork();
if ($pid == 0) {
# 子进程 — 写入
close $reader;
print $writer "Hello from child!\n";
close $writer;
exit(0);
} else {
# 父进程 — 读取
close $writer;
my $msg = <$reader>;
chomp $msg;
print "收到消息: $msg\n";
close $reader;
waitpid($pid, 0);
}
并行处理任务
use strict;
use warnings;
use POSIX ":sys_wait_h";
my @tasks = (1, 2, 3, 4, 5, 6, 7, 8);
my $max_workers = 4;
my @pids;
for my $task (@tasks) {
# 等待空闲
while (scalar @pids >= $max_workers) {
my $finished = waitpid(-1, 0);
@pids = grep { $_ != $finished } @pids;
}
my $pid = fork();
if ($pid == 0) {
# 子进程处理任务
print "处理任务 $task (PID $$)\n";
sleep(1 + int(rand(3)));
print "任务 $task 完成\n";
exit(0);
} else {
push @pids, $pid;
}
}
# 等待所有子进程
while (my $pid = waitpid(-1, 0)) {
last if $pid == -1;
}
print "所有任务完成\n";
19.3 threads — 多线程
use strict;
use warnings;
use threads;
use threads::shared;
# 共享变量
my $counter :shared = 0;
sub worker {
my ($id) = @_;
for (1..5) {
{
lock($counter);
$counter++;
print "线程 $id: counter = $counter\n";
}
threads->yield();
}
}
# 创建线程
my @threads;
for my $i (1..4) {
push @threads, threads->create(\&worker, $i);
}
# 等待所有线程
for my $t (@threads) {
$t->join();
}
print "最终 counter = $counter\n";
线程安全
use threads;
use threads::shared;
use Thread::Queue;
# 使用队列进行线程间通信
my $queue = Thread::Queue->new();
# 生产者
my $producer = threads->create(sub {
for my $i (1..10) {
$queue->enqueue("task_$i");
print "生产: task_$i\n";
}
$queue->enqueue(undef); # 结束信号
});
# 消费者
my @consumers;
for my $id (1..3) {
push @consumers, threads->create(sub {
while (my $item = $queue->dequeue()) {
print "消费者 $id: 处理 $item\n";
sleep 1;
}
});
}
$producer->join();
$_->join() for @consumers;
19.4 AnyEvent — 事件驱动
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
# 基本事件循环
my $cv = AnyEvent->condvar;
# 定时器
my $timer = AnyEvent->timer(
after => 1,
interval => 2,
cb => sub { print "每 2 秒执行\n" },
);
# 并发 HTTP 请求
my @urls = (
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/uuid",
);
for my $url (@urls) {
http_get $url, sub {
my ($body, $headers) = @_;
print "完成: $url (Status: $headers->{Status})\n";
$cv->send if --$count == 0;
};
}
my $count = scalar @urls;
$cv->recv;
print "所有请求完成\n";
19.5 IO::Async — 现代异步框架
use strict;
use warnings;
use IO::Async::Loop;
use IO::Async::Timer::Periodic;
my $loop = IO::Async::Loop->new;
# 周期定时器
my $timer = IO::Async::Timer::Periodic->new(
interval => 3,
on_tick => sub { print "定时任务: " . localtime . "\n" },
);
$timer->start;
$loop->add($timer);
# 子进程管理
$loop->open_child(
command => ["ping", "-c", "3", "localhost"],
on_stdout => sub {
my ($stream, $line) = @_;
print "PING: $line";
},
on_exit => sub {
my ($pid, $exitcode) = @_;
print "ping 退出: $exitcode\n";
$loop->stop;
},
);
$loop->run;
19.6 Parallel::ForkManager — 进程池
use strict;
use warnings;
use Parallel::ForkManager;
my $pm = Parallel::ForkManager->new(4); # 最多 4 个进程
# 回调
$pm->run_on_finish(sub {
my ($pid, $exit_code, $ident) = @_;
print "进程 $ident 完成 (exit: $exit_code)\n";
});
for my $task (1..20) {
$pm->start($task) and next; # 父进程继续
# 子进程
print "处理任务 $task (PID $$)\n";
sleep int(rand(3)) + 1;
$pm->finish($task); # 子进程退出
}
$pm->wait_all_children;
print "所有任务完成\n";
19.7 业务场景:并发网页爬虫
#!/usr/bin/env perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use JSON::XS;
use Path::Tiny;
my @urls = (
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
"https://httpbin.org/uuid",
);
my $output_dir = path("crawl_results");
$output_dir->mkpath;
my $cv = AnyEvent->condvar(scalar @urls);
my $completed = 0;
my $total = scalar @urls;
for my $url (@urls) {
http_get $url, timeout => 10, sub {
my ($body, $headers) = @_;
$completed++;
if ($headers->{Status} == 200) {
my $filename = $url;
$filename =~ s{.*/}{};
$output_dir->child("$filename.json")->spew_utf8($body);
printf "[%d/%d] %s → 成功 (%d bytes)\n",
$completed, $total, $url, length($body);
} else {
printf "[%d/%d] %s → 失败 (%s)\n",
$completed, $total, $url, $headers->{Status};
}
$cv->send;
};
}
$cv->recv;
print "爬取完成!共 $completed 个页面\n";
19.8 并发模型选择指南
需要并发?
│
├── I/O 密集型(网络请求、文件操作)
│ ├── 协议层 → IO::Async / AnyEvent
│ └── 应用层 → Mojo::IOLoop(Mojolicious 内置)
│
├── CPU 密集型(计算、数据处理)
│ ├── 进程隔离 → Parallel::ForkManager
│ └── 共享内存 → threads + Thread::Queue
│
└── 任务队列
├── 简单 → Parallel::ForkManager
└── 复杂 → Minion(Mojolicious 任务队列)
本章小结
| 要点 | 内容 |
|---|---|
fork | 多进程,独立内存,Unix 标准 |
threads | 多线程,共享内存,需注意锁 |
AnyEvent | 事件驱动,轻量级 |
IO::Async | 现代异步框架 |
Parallel::ForkManager | 进程池管理 |
Thread::Queue | 线程安全队列 |
| 进程间通信 | 管道、信号、共享内存 |
练习
- 使用 fork 实现并行文件下载器
- 使用 threads + Thread::Queue 实现生产者-消费者模型
- 使用 AnyEvent 实现并发 HTTP 请求
- 使用 Parallel::ForkManager 处理 100 个任务(限制 8 个并发)
- 比较 fork 和 threads 的性能差异