强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第7章:Go 协程 —— CSP 模型的典范

第7章:Go 协程 —— CSP 模型的典范

7.1 Go 的并发哲学

Go 语言的并发模型基于 CSP(Communicating Sequential Processes,通信顺序进程),由 Tony Hoare 在 1978 年提出。

Go 的格言“Don’t communicate by sharing memory; share memory by communicating.” (不要通过共享内存来通信,而要通过通信来共享内存。)

CSP vs Actor 模型

特性 CSP (Go) Actor (Erlang)
通信方式 Channel(通道) 消息传递(Mailbox)
发送者 必须知道 Channel 直接发送到 PID
接收者 从 Channel 读取 从 Mailbox 读取
耦合度 较低(通过 Channel 解耦) 较低(通过 PID 解耦)
选择机制 select 语句 Pattern Matching

7.2 Goroutine 基础

创建 Goroutine

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("[%s] 你好 %d\n", name, i)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // 使用 go 关键字启动 goroutine
    go sayHello("协程A")
    go sayHello("协程B")
    go func() {
        fmt.Println("[匿名协程] 执行")
    }()

    time.Sleep(1 * time.Second) // 等待所有 goroutine 完成
}

Goroutine vs OS 线程

特性 Goroutine OS 线程
初始栈大小 2KB(可动态增长) 2MB(固定)
创建开销 ~0.3μs ~30μs
切换开销 ~100ns ~1-10μs
最大数量 数百万 数千
调度 Go 运行时(GMP) OS 内核

WaitGroup — 等待所有 Goroutine

import "sync"

func processItems(items []int) {
    var wg sync.WaitGroup

    for _, item := range items {
        wg.Add(1) // 重要:在 goroutine 外面 Add
        go func(item int) {
            defer wg.Done()
            process(item)
        }(item) // 重要:传入参数,避免闭包陷阱
    }

    wg.Wait() // 阻塞直到所有 goroutine 完成
}

闭包陷阱

// ❌ 常见错误:循环变量捕获
for i := 0; i < 5; i++ {
    go func() {
        fmt.Println(i) // 可能全部输出 5
    }()
}

// ✅ 正确:通过参数传递
for i := 0; i < 5; i++ {
    go func(n int) {
        fmt.Println(n)
    }(i)
}

// ✅ Go 1.22+:循环变量每次迭代都是新变量
// 不再需要额外处理

7.3 Channel — Go 的通信管道

基本用法

// 无缓冲 Channel(同步通信)
ch := make(chan int)

// 带缓冲 Channel(异步通信)
ch := make(chan int, 10)

// 发送
ch <- 42

// 接收
value := <-ch

// 关闭 Channel
close(ch)

Channel 类型对比

类型 创建方式 发送行为 接收行为 用途
无缓冲 make(chan T) 阻塞直到有接收者 阻塞直到有发送者 同步协调
有缓冲 make(chan T, n) 阻塞直到缓冲区满 阻塞直到缓冲区有数据 异步解耦
只发送 chan<- T - - 类型安全
只接收 <-chan T - - 类型安全

Channel 使用模式

生产者-消费者:

func producer(ch chan<- int) {
    for i := 0; i < 100; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Printf("处理: %d\n", v)
    }
    // range 会在 Channel 关闭后自动退出
}

func main() {
    ch := make(chan int, 10)
    go producer(ch)
    consumer(ch) // 在主 goroutine 中消费
}

Fan-out Fan-in(扇出扇入):

func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        channels[i] = worker(input)
    }
    return channels
}

func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

7.4 Select 语句

select 语句用于同时等待多个 Channel 操作。

基本用法

select {
case msg := <-ch1:
    fmt.Println("从 ch1 收到:", msg)
case msg := <-ch2:
    fmt.Println("从 ch2 收到:", msg)
case <-time.After(3 * time.Second):
    fmt.Println("超时")
default:
    fmt.Println("没有就绪的 Channel")
}

Select 的行为规则

规则 说明
随机选择 多个 case 同时就绪时,随机选择一个执行
阻塞等待 如果没有 default 且没有 case 就绪,select 阻塞
非阻塞 default 时,没有 case 就绪就执行 default
空 select select{} 永远阻塞

实用模式

超时控制:

func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
    ch := make(chan []byte, 1)
    errCh := make(chan error, 1)

    go func() {
        resp, err := http.Get(url)
        if err != nil {
            errCh <- err
            return
        }
        defer resp.Body.Close()
        body, _ := io.ReadAll(resp.Body)
        ch <- body
    }()

    select {
    case body := <-ch:
        return body, nil
    case err := <-errCh:
        return nil, err
    case <-time.After(timeout):
        return nil, fmt.Errorf("请求超时 (%v)", timeout)
    }
}

心跳机制:

func worker(done chan<- struct{}) {
    ticker := time.NewTicker(500 * time.Millisecond)
    defer ticker.Stop()

    heartbeat := time.NewTicker(2 * time.Second)
    defer heartbeat.Stop()

    for {
        select {
        case <-ticker.C:
            // 执行工作
            doWork()
        case <-heartbeat.C:
            // 发送心跳
            reportHealth()
        case <-done:
            // 收到停止信号
            return
        }
    }
}

非阻塞操作:

select {
case ch <- value:
    fmt.Println("发送成功")
default:
    fmt.Println("Channel 已满,丢弃")
}

7.5 Context — 取消与超时传播

func fetchUserData(ctx context.Context, userID string) (*User, error) {
    // 创建子 context,5 秒超时
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    // 传递 context 给下游调用
    userCh := make(chan *User, 1)
    errCh := make(chan error, 1)

    go func() {
        user, err := db.QueryUser(ctx, userID)
        if err != nil {
            errCh <- err
            return
        }
        userCh <- user
    }()

    select {
    case user := <-userCh:
        return user, nil
    case err := <-errCh:
        return nil, err
    case <-ctx.Done():
        return nil, ctx.Err() // context.Canceled 或 context.DeadlineExceeded
    }
}

Context 类型

函数 用途 取消原因
context.Background() 根 context 永不取消
context.TODO() 占位符 -
context.WithCancel(parent) 手动取消 调用 cancel()
context.WithTimeout(parent, d) 超时取消 超时或调用 cancel()
context.WithDeadline(parent, t) 截止时间取消 到期或调用 cancel()
context.WithValue(parent, k, v) 传递值 继承父 context

7.6 GMP 调度模型深度剖析

组件定义

组件 全称 说明
G Goroutine 用户级协程,包含栈、状态、PC
M Machine OS 线程,执行 Go 代码
P Processor 逻辑处理器,持有本地运行队列

调度流程

              全局运行队列
           ┌──────────────┐
           │ G  G  G  G   │
           └──────┬───────┘
                  │
    ┌─────────────┼─────────────┐
    │             │             │
    ▼             ▼             ▼
┌────────┐  ┌────────┐  ┌────────┐
│  P0    │  │  P1    │  │  P2    │
│[G][G]  │  │[G]     │  │[G][G]  │
│本地队列 │  │本地队列 │  │本地队列 │
└───┬────┘  └───┬────┘  └───┬────┘
    │           │           │
    ▼           ▼           ▼
┌────────┐  ┌────────┐  ┌────────┐
│  M0    │  │  M1    │  │  M2    │
│(线程)   │  │(线程)   │  │(线程)   │
└────────┘  └────────┘  └────────┘

调度策略

策略 描述
本地优先 P 优先从本地队列取 G
窃取机制 空闲的 P 从其他 P 的本地队列"偷"一半 G
全局队列 定期检查全局队列,防止饥饿
网络轮询 G 进入 I/O 时,M 绑定到 netpoller 而非阻塞
异步抢占 Go 1.14+,基于信号的抢占式调度

工作窃取(Work Stealing)

P0 空闲了,从 P1 偷一半任务:

Before:
P0: [空]          P1: [G1, G2, G3, G4]

After:
P0: [G3, G4]     P1: [G1, G2]

7.7 业务场景:并发爬虫

func crawl(startURL string, maxDepth int) []string {
    visited := &sync.Map{}
    results := make(chan string, 100)
    var wg sync.WaitGroup

    var crawlFunc func(url string, depth int)
    crawlFunc = func(url string, depth int) {
        defer wg.Done()
        if depth > maxDepth {
            return
        }
        if _, loaded := visited.LoadOrStore(url, true); loaded {
            return // 已访问
        }

        links, err := fetchLinks(url)
        if err != nil {
            return
        }
        results <- url

        for _, link := range links {
            wg.Add(1)
            go crawlFunc(link, depth+1)
        }
    }

    wg.Add(1)
    go crawlFunc(startURL, 0)

    // 等待完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    var urls []string
    for url := range results {
        urls = append(urls, url)
    }
    return urls
}

7.8 本章小结

要点 说明
CSP 模型 “通过通信共享内存"而非"通过共享内存通信”
goroutine 2KB 初始栈,数百万级并发
Channel 类型安全的通信管道,支持缓冲
select 多路复用,支持超时和非阻塞
Context 取消传播和超时控制
GMP 模型 N:M 调度,工作窃取,异步抢占

下一章预告:Python 的 asyncio 是如何从生成器演进到原生协程的?


扩展阅读