强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

PHP 完全指南 / 第 14 章 — 生成器

第 14 章 — 生成器:yield、协程与大数据处理

14.1 生成器基础

生成器是使用 yield 关键字的特殊函数,它能暂停和恢复执行,每次产生一个值而不终止函数。

<?php
// 普通函数 — 一次性返回所有数据
function getNumbers(int $count): array
{
    $numbers = [];
    for ($i = 0; $i < $count; $i++) {
        $numbers[] = $i * 2;
    }
    return $numbers;  // 所有数据在内存中
}

// 生成器 — 每次返回一个值
function generateNumbers(int $count): Generator
{
    for ($i = 0; $i < $count; $i++) {
        yield $i * 2;  // 产生一个值,然后暂停
    }
}

// 使用方式完全相同
foreach (generateNumbers(10) as $number) {
    echo "{$number} ";
}
// 0 2 4 6 8 10 12 14 16 18

内存对比

<?php
// 普通函数:100万个整数占用约 120MB
$arr = range(1, 1_000_000);
echo memory_get_usage();  // ~120MB

// 生成器:几乎不占内存
function xrange(int $start, int $end): Generator
{
    for ($i = $start; $i <= $end; $i++) {
        yield $i;
    }
}

foreach (xrange(1, 1_000_000) as $n) {
    // 每次只有当前值在内存中
}
echo memory_get_usage();  // ~2MB

14.2 yield 的键值对

<?php
function fibonacci(): Generator
{
    [$a, $b] = [0, 1];
    $i = 0;
    while (true) {
        yield $i => $a;
        [$a, $b] = [$b, $a + $b];
        $i++;
    }
}

// 只取前 10 个
$count = 0;
foreach (fibonacci() as $index => $value) {
    echo "F({$index}) = {$value}\n";
    if (++$count >= 10) break;
}
// F(0) = 0, F(1) = 1, F(2) = 1, F(3) = 2, ...

14.3 yield from — 委托生成器

<?php
// yield from 将生成委托给另一个可迭代对象
function combined(): Generator
{
    yield 'A';
    yield from ['B', 'C', 'D'];  // 展开数组
    yield 'E';
    yield from range(6, 8);       // 展开 range
    yield from innerGenerator();  // 委托给另一个生成器
}

function innerGenerator(): Generator
{
    yield 'X';
    yield 'Y';
}

foreach (combined() as $v) {
    echo "{$v} ";
}
// A B C D E 6 7 8 X Y

递归生成器(树遍历)

<?php
function flattenTree(array $tree): Generator
{
    foreach ($tree as $node) {
        if (isset($node['children'])) {
            yield from flattenTree($node['children']);
        }
        yield $node['value'];
    }
}

$tree = [
    ['value' => 1, 'children' => [
        ['value' => 11],
        ['value' => 12, 'children' => [
            ['value' => 121],
        ]],
    ]],
    ['value' => 2],
];

foreach (flattenTree($tree) as $v) {
    echo "{$v} ";
}
// 1 11 12 121 2

14.4 生成器的双向通信

生成器不仅能产出值,还能接收外部传入的值

14.4.1 send()

<?php
function logger(): Generator
{
    while (true) {
        $message = yield;  // 接收外部传入的值
        echo "[LOG " . date('H:i:s') . "] {$message}\n";
    }
}

$log = logger();
$log->current();        // 初始化生成器
$log->send('User logged in');
$log->send('Order placed');
$log->send('Payment received');

14.4.2 getReturn()

<?php
function sum(array $numbers): Generator
{
    $total = 0;
    foreach ($numbers as $n) {
        $total += $n;
        yield $total;  // 每步都产出当前累计值
    }
    return $total;  // 最终返回值
}

$gen = sum([1, 2, 3, 4, 5]);
foreach ($gen as $partialSum) {
    echo "{$partialSum} ";
}
// 1 3 6 10 15

// 获取 return 值
echo $gen->getReturn();  // 15

14.5 协程(Coroutines)

PHP 的生成器可以实现简单的协程模式,用于异步任务调度。

14.5.1 简单任务调度器

<?php
declare(strict_types=1);

class SimpleScheduler
{
    private SplQueue $queue;

    public function __construct()
    {
        $this->queue = new SplQueue();
    }

    public function addTask(Generator $coroutine): void
    {
        $this->queue->enqueue($coroutine);
    }

    public function run(): void
    {
        while (!$this->queue->isEmpty()) {
            $coroutine = $this->queue->dequeue();

            $coroutine->current();  // 推进协程

            if ($coroutine->valid()) {
                $this->queue->enqueue($coroutine);  // 还没结束,放回队列
            }
        }
    }
}

// 任务
function task(string $name, int $steps): Generator
{
    for ($i = 1; $i <= $steps; $i++) {
        echo "[{$name}] Step {$i}\n";
        yield;  // 暂停,让出控制权
    }
    echo "[{$name}] Done!\n";
}

$scheduler = new SimpleScheduler();
$scheduler->addTask(task('Task-A', 3));
$scheduler->addTask(task('Task-B', 2));
$scheduler->addTask(task('Task-C', 4));
$scheduler->run();
// 交替执行: Task-A Step 1, Task-B Step 1, Task-C Step 1, Task-A Step 2, ...

14.6 管道模式

<?php
declare(strict_types=1);

function readLines(string $file): Generator
{
    $handle = fopen($file, 'r');
    try {
        while (($line = fgets($handle)) !== false) {
            yield trim($line);
        }
    } finally {
        fclose($handle);
    }
}

function filterComments(Generator $lines): Generator
{
    foreach ($lines as $line) {
        if ($line !== '' && !str_starts_with($line, '#') && !str_starts_with($line, '//')) {
            yield $line;
        }
    }
}

function parseCSV(Generator $lines): Generator
{
    foreach ($lines as $line) {
        yield str_getcsv($line);
    }
}

function transform(Generator $rows): Generator
{
    foreach ($rows as $row) {
        if (count($row) >= 3) {
            yield [
                'name'  => $row[0],
                'email' => $row[1],
                'age'   => (int)$row[2],
            ];
        }
    }
}

// 组合管道
$pipeline = transform(parseCSV(filterComments(readLines('/tmp/data.csv'))));

foreach ($pipeline as $user) {
    print_r($user);
}

14.7 大数据处理实战

14.7.1 流式处理大文件

<?php
function readLargeCSV(string $file, int $chunkSize = 1000): Generator
{
    $handle = fopen($file, 'r');
    $header = fgetcsv($handle);  // 读取表头

    $chunk = [];
    $count = 0;

    while (($row = fgetcsv($handle)) !== false) {
        $chunk[] = array_combine($header, $row);
        $count++;

        if ($count % $chunkSize === 0) {
            yield $chunk;
            $chunk = [];
        }
    }

    if (!empty($chunk)) {
        yield $chunk;
    }

    fclose($handle);
}

// 流式导入数据库
foreach (readLargeCSV('/tmp/huge-data.csv', 500) as $chunk) {
    $pdo->beginTransaction();
    $stmt = $pdo->prepare('INSERT INTO users (name, email, age) VALUES (?, ?, ?)');

    foreach ($chunk as $row) {
        $stmt->execute([$row['name'], $row['email'], $row['age']]);
    }

    $pdo->commit();
    echo "Inserted " . count($chunk) . " rows\n";
}

14.7.2 流式 JSON 输出

<?php
function streamJsonResponse(Generator $data): void
{
    header('Content-Type: application/json');
    echo '[';
    $first = true;

    foreach ($data as $item) {
        if (!$first) echo ',';
        echo json_encode($item, JSON_UNESCAPED_UNICODE);
        $first = false;

        // 每处理 100 条刷新一次输出缓冲
        if (ob_get_level()) ob_flush();
        flush();
    }

    echo ']';
}

function fetchUsers(): Generator
{
    $stmt = $pdo->query('SELECT * FROM users');
    while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
        yield $row;  // 每次只有一行在内存中
    }
}

streamJsonResponse(fetchUsers());

14.8 ArrayIterator 与生成器对比

特性GeneratorArrayIterator
内存按需产生,低需要全部数据
只遍历一次
count()
随机访问
惰性求值
可缓存需转换

转换

<?php
// 生成器转数组
$generator = range(1, 10);  // 注意:range() 本身返回数组
$array = iterator_to_array($generator());

// 生成器计数
$count = iterator_count($generator());

14.9 业务场景:ETL 数据管道

<?php
declare(strict_types=1);

class ETLPipeline
{
    public static function extract(string $source): Generator
    {
        $handle = fopen($source, 'r');
        $header = fgetcsv($handle);

        while (($row = fgetcsv($handle)) !== false) {
            yield array_combine($header, $row);
        }

        fclose($handle);
    }

    public static function transform(Generator $rows): Generator
    {
        foreach ($rows as $i => $row) {
            // 数据清洗
            $row['email'] = strtolower(trim($row['email']));
            $row['name']  = mb_convert_encoding($row['name'], 'UTF-8', 'GBK');

            // 过滤无效数据
            if (!filter_var($row['email'], FILTER_VALIDATE_EMAIL)) {
                continue;
            }

            // 类型转换
            $row['age'] = (int) $row['age'];
            $row['created_at'] = date('Y-m-d H:i:s');

            yield $i => $row;
        }
    }

    public static function load(Generator $rows, PDO $db, int $batchSize = 500): Generator
    {
        $batch = [];
        $total = 0;

        foreach ($rows as $row) {
            $batch[] = $row;

            if (count($batch) >= $batchSize) {
                self::insertBatch($db, $batch);
                $total += count($batch);
                yield $total;
                $batch = [];
            }
        }

        if (!empty($batch)) {
            self::insertBatch($db, $batch);
            $total += count($batch);
            yield $total;
        }
    }

    private static function insertBatch(PDO $db, array $batch): void
    {
        $db->beginTransaction();
        $stmt = $db->prepare(
            'INSERT INTO users (name, email, age, created_at) VALUES (?, ?, ?, ?)'
        );

        foreach ($batch as $row) {
            $stmt->execute([
                $row['name'],
                $row['email'],
                $row['age'],
                $row['created_at'],
            ]);
        }

        $db->commit();
    }
}

// 使用
$db = new PDO('mysql:host=localhost;dbname=test', 'root', '');
$pipeline = ETLPipeline::load(
    ETLPipeline::transform(
        ETLPipeline::extract('/tmp/users.csv')
    ),
    $db,
    1000
);

foreach ($pipeline as $totalProcessed) {
    echo "Processed: {$totalProcessed}\n";
}

14.10 扩展阅读


上一章第 13 章 — Attributes 下一章第 15 章 — Composer