PHP 完全指南 / 第 14 章 — 生成器
第 14 章 — 生成器:yield、协程与大数据处理
14.1 生成器基础
生成器是使用 yield 关键字的特殊函数,它能暂停和恢复执行,每次产生一个值而不终止函数。
<?php
// 普通函数 — 一次性返回所有数据
function getNumbers(int $count): array
{
$numbers = [];
for ($i = 0; $i < $count; $i++) {
$numbers[] = $i * 2;
}
return $numbers; // 所有数据在内存中
}
// 生成器 — 每次返回一个值
function generateNumbers(int $count): Generator
{
for ($i = 0; $i < $count; $i++) {
yield $i * 2; // 产生一个值,然后暂停
}
}
// 使用方式完全相同
foreach (generateNumbers(10) as $number) {
echo "{$number} ";
}
// 0 2 4 6 8 10 12 14 16 18
内存对比
<?php
// 普通函数:100万个整数占用约 120MB
$arr = range(1, 1_000_000);
echo memory_get_usage(); // ~120MB
// 生成器:几乎不占内存
function xrange(int $start, int $end): Generator
{
for ($i = $start; $i <= $end; $i++) {
yield $i;
}
}
foreach (xrange(1, 1_000_000) as $n) {
// 每次只有当前值在内存中
}
echo memory_get_usage(); // ~2MB
14.2 yield 的键值对
<?php
function fibonacci(): Generator
{
[$a, $b] = [0, 1];
$i = 0;
while (true) {
yield $i => $a;
[$a, $b] = [$b, $a + $b];
$i++;
}
}
// 只取前 10 个
$count = 0;
foreach (fibonacci() as $index => $value) {
echo "F({$index}) = {$value}\n";
if (++$count >= 10) break;
}
// F(0) = 0, F(1) = 1, F(2) = 1, F(3) = 2, ...
14.3 yield from — 委托生成器
<?php
// yield from 将生成委托给另一个可迭代对象
function combined(): Generator
{
yield 'A';
yield from ['B', 'C', 'D']; // 展开数组
yield 'E';
yield from range(6, 8); // 展开 range
yield from innerGenerator(); // 委托给另一个生成器
}
function innerGenerator(): Generator
{
yield 'X';
yield 'Y';
}
foreach (combined() as $v) {
echo "{$v} ";
}
// A B C D E 6 7 8 X Y
递归生成器(树遍历)
<?php
function flattenTree(array $tree): Generator
{
foreach ($tree as $node) {
if (isset($node['children'])) {
yield from flattenTree($node['children']);
}
yield $node['value'];
}
}
$tree = [
['value' => 1, 'children' => [
['value' => 11],
['value' => 12, 'children' => [
['value' => 121],
]],
]],
['value' => 2],
];
foreach (flattenTree($tree) as $v) {
echo "{$v} ";
}
// 1 11 12 121 2
14.4 生成器的双向通信
生成器不仅能产出值,还能接收外部传入的值。
14.4.1 send()
<?php
function logger(): Generator
{
while (true) {
$message = yield; // 接收外部传入的值
echo "[LOG " . date('H:i:s') . "] {$message}\n";
}
}
$log = logger();
$log->current(); // 初始化生成器
$log->send('User logged in');
$log->send('Order placed');
$log->send('Payment received');
14.4.2 getReturn()
<?php
function sum(array $numbers): Generator
{
$total = 0;
foreach ($numbers as $n) {
$total += $n;
yield $total; // 每步都产出当前累计值
}
return $total; // 最终返回值
}
$gen = sum([1, 2, 3, 4, 5]);
foreach ($gen as $partialSum) {
echo "{$partialSum} ";
}
// 1 3 6 10 15
// 获取 return 值
echo $gen->getReturn(); // 15
14.5 协程(Coroutines)
PHP 的生成器可以实现简单的协程模式,用于异步任务调度。
14.5.1 简单任务调度器
<?php
declare(strict_types=1);
class SimpleScheduler
{
private SplQueue $queue;
public function __construct()
{
$this->queue = new SplQueue();
}
public function addTask(Generator $coroutine): void
{
$this->queue->enqueue($coroutine);
}
public function run(): void
{
while (!$this->queue->isEmpty()) {
$coroutine = $this->queue->dequeue();
$coroutine->current(); // 推进协程
if ($coroutine->valid()) {
$this->queue->enqueue($coroutine); // 还没结束,放回队列
}
}
}
}
// 任务
function task(string $name, int $steps): Generator
{
for ($i = 1; $i <= $steps; $i++) {
echo "[{$name}] Step {$i}\n";
yield; // 暂停,让出控制权
}
echo "[{$name}] Done!\n";
}
$scheduler = new SimpleScheduler();
$scheduler->addTask(task('Task-A', 3));
$scheduler->addTask(task('Task-B', 2));
$scheduler->addTask(task('Task-C', 4));
$scheduler->run();
// 交替执行: Task-A Step 1, Task-B Step 1, Task-C Step 1, Task-A Step 2, ...
14.6 管道模式
<?php
declare(strict_types=1);
function readLines(string $file): Generator
{
$handle = fopen($file, 'r');
try {
while (($line = fgets($handle)) !== false) {
yield trim($line);
}
} finally {
fclose($handle);
}
}
function filterComments(Generator $lines): Generator
{
foreach ($lines as $line) {
if ($line !== '' && !str_starts_with($line, '#') && !str_starts_with($line, '//')) {
yield $line;
}
}
}
function parseCSV(Generator $lines): Generator
{
foreach ($lines as $line) {
yield str_getcsv($line);
}
}
function transform(Generator $rows): Generator
{
foreach ($rows as $row) {
if (count($row) >= 3) {
yield [
'name' => $row[0],
'email' => $row[1],
'age' => (int)$row[2],
];
}
}
}
// 组合管道
$pipeline = transform(parseCSV(filterComments(readLines('/tmp/data.csv'))));
foreach ($pipeline as $user) {
print_r($user);
}
14.7 大数据处理实战
14.7.1 流式处理大文件
<?php
function readLargeCSV(string $file, int $chunkSize = 1000): Generator
{
$handle = fopen($file, 'r');
$header = fgetcsv($handle); // 读取表头
$chunk = [];
$count = 0;
while (($row = fgetcsv($handle)) !== false) {
$chunk[] = array_combine($header, $row);
$count++;
if ($count % $chunkSize === 0) {
yield $chunk;
$chunk = [];
}
}
if (!empty($chunk)) {
yield $chunk;
}
fclose($handle);
}
// 流式导入数据库
foreach (readLargeCSV('/tmp/huge-data.csv', 500) as $chunk) {
$pdo->beginTransaction();
$stmt = $pdo->prepare('INSERT INTO users (name, email, age) VALUES (?, ?, ?)');
foreach ($chunk as $row) {
$stmt->execute([$row['name'], $row['email'], $row['age']]);
}
$pdo->commit();
echo "Inserted " . count($chunk) . " rows\n";
}
14.7.2 流式 JSON 输出
<?php
function streamJsonResponse(Generator $data): void
{
header('Content-Type: application/json');
echo '[';
$first = true;
foreach ($data as $item) {
if (!$first) echo ',';
echo json_encode($item, JSON_UNESCAPED_UNICODE);
$first = false;
// 每处理 100 条刷新一次输出缓冲
if (ob_get_level()) ob_flush();
flush();
}
echo ']';
}
function fetchUsers(): Generator
{
$stmt = $pdo->query('SELECT * FROM users');
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
yield $row; // 每次只有一行在内存中
}
}
streamJsonResponse(fetchUsers());
14.8 ArrayIterator 与生成器对比
| 特性 | Generator | ArrayIterator |
|---|---|---|
| 内存 | 按需产生,低 | 需要全部数据 |
| 只遍历一次 | ✅ | ✅ |
| count() | ❌ | ✅ |
| 随机访问 | ❌ | ✅ |
| 惰性求值 | ✅ | ❌ |
| 可缓存 | 需转换 | ✅ |
转换
<?php
// 生成器转数组
$generator = range(1, 10); // 注意:range() 本身返回数组
$array = iterator_to_array($generator());
// 生成器计数
$count = iterator_count($generator());
14.9 业务场景:ETL 数据管道
<?php
declare(strict_types=1);
class ETLPipeline
{
public static function extract(string $source): Generator
{
$handle = fopen($source, 'r');
$header = fgetcsv($handle);
while (($row = fgetcsv($handle)) !== false) {
yield array_combine($header, $row);
}
fclose($handle);
}
public static function transform(Generator $rows): Generator
{
foreach ($rows as $i => $row) {
// 数据清洗
$row['email'] = strtolower(trim($row['email']));
$row['name'] = mb_convert_encoding($row['name'], 'UTF-8', 'GBK');
// 过滤无效数据
if (!filter_var($row['email'], FILTER_VALIDATE_EMAIL)) {
continue;
}
// 类型转换
$row['age'] = (int) $row['age'];
$row['created_at'] = date('Y-m-d H:i:s');
yield $i => $row;
}
}
public static function load(Generator $rows, PDO $db, int $batchSize = 500): Generator
{
$batch = [];
$total = 0;
foreach ($rows as $row) {
$batch[] = $row;
if (count($batch) >= $batchSize) {
self::insertBatch($db, $batch);
$total += count($batch);
yield $total;
$batch = [];
}
}
if (!empty($batch)) {
self::insertBatch($db, $batch);
$total += count($batch);
yield $total;
}
}
private static function insertBatch(PDO $db, array $batch): void
{
$db->beginTransaction();
$stmt = $db->prepare(
'INSERT INTO users (name, email, age, created_at) VALUES (?, ?, ?, ?)'
);
foreach ($batch as $row) {
$stmt->execute([
$row['name'],
$row['email'],
$row['age'],
$row['created_at'],
]);
}
$db->commit();
}
}
// 使用
$db = new PDO('mysql:host=localhost;dbname=test', 'root', '');
$pipeline = ETLPipeline::load(
ETLPipeline::transform(
ETLPipeline::extract('/tmp/users.csv')
),
$db,
1000
);
foreach ($pipeline as $totalProcessed) {
echo "Processed: {$totalProcessed}\n";
}
14.10 扩展阅读
上一章:第 13 章 — Attributes 下一章:第 15 章 — Composer