强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第7章:Go 协程 —— CSP 模型的典范

第7章:Go 协程 —— CSP 模型的典范

7.1 Go 的并发哲学

Go 语言的并发模型基于 CSP(Communicating Sequential Processes,通信顺序进程),由 Tony Hoare 在 1978 年提出。

Go 的格言“Don’t communicate by sharing memory; share memory by communicating.” (不要通过共享内存来通信,而要通过通信来共享内存。)

CSP vs Actor 模型

特性CSP (Go)Actor (Erlang)
通信方式Channel(通道)消息传递(Mailbox)
发送者必须知道 Channel直接发送到 PID
接收者从 Channel 读取从 Mailbox 读取
耦合度较低(通过 Channel 解耦)较低(通过 PID 解耦)
选择机制select 语句Pattern Matching

7.2 Goroutine 基础

创建 Goroutine

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("[%s] 你好 %d\n", name, i)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // 使用 go 关键字启动 goroutine
    go sayHello("协程A")
    go sayHello("协程B")
    go func() {
        fmt.Println("[匿名协程] 执行")
    }()

    time.Sleep(1 * time.Second) // 等待所有 goroutine 完成
}

Goroutine vs OS 线程

特性GoroutineOS 线程
初始栈大小2KB(可动态增长)2MB(固定)
创建开销~0.3μs~30μs
切换开销~100ns~1-10μs
最大数量数百万数千
调度Go 运行时(GMP)OS 内核

WaitGroup — 等待所有 Goroutine

import "sync"

func processItems(items []int) {
    var wg sync.WaitGroup

    for _, item := range items {
        wg.Add(1) // 重要:在 goroutine 外面 Add
        go func(item int) {
            defer wg.Done()
            process(item)
        }(item) // 重要:传入参数,避免闭包陷阱
    }

    wg.Wait() // 阻塞直到所有 goroutine 完成
}

闭包陷阱

// ❌ 常见错误:循环变量捕获
for i := 0; i < 5; i++ {
    go func() {
        fmt.Println(i) // 可能全部输出 5
    }()
}

// ✅ 正确:通过参数传递
for i := 0; i < 5; i++ {
    go func(n int) {
        fmt.Println(n)
    }(i)
}

// ✅ Go 1.22+:循环变量每次迭代都是新变量
// 不再需要额外处理

7.3 Channel — Go 的通信管道

基本用法

// 无缓冲 Channel(同步通信)
ch := make(chan int)

// 带缓冲 Channel(异步通信)
ch := make(chan int, 10)

// 发送
ch <- 42

// 接收
value := <-ch

// 关闭 Channel
close(ch)

Channel 类型对比

类型创建方式发送行为接收行为用途
无缓冲make(chan T)阻塞直到有接收者阻塞直到有发送者同步协调
有缓冲make(chan T, n)阻塞直到缓冲区满阻塞直到缓冲区有数据异步解耦
只发送chan<- T--类型安全
只接收<-chan T--类型安全

Channel 使用模式

生产者-消费者:

func producer(ch chan<- int) {
    for i := 0; i < 100; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Printf("处理: %d\n", v)
    }
    // range 会在 Channel 关闭后自动退出
}

func main() {
    ch := make(chan int, 10)
    go producer(ch)
    consumer(ch) // 在主 goroutine 中消费
}

Fan-out Fan-in(扇出扇入):

func fanOut(input <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        channels[i] = worker(input)
    }
    return channels
}

func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

7.4 Select 语句

select 语句用于同时等待多个 Channel 操作。

基本用法

select {
case msg := <-ch1:
    fmt.Println("从 ch1 收到:", msg)
case msg := <-ch2:
    fmt.Println("从 ch2 收到:", msg)
case <-time.After(3 * time.Second):
    fmt.Println("超时")
default:
    fmt.Println("没有就绪的 Channel")
}

Select 的行为规则

规则说明
随机选择多个 case 同时就绪时,随机选择一个执行
阻塞等待如果没有 default 且没有 case 就绪,select 阻塞
非阻塞default 时,没有 case 就绪就执行 default
空 selectselect{} 永远阻塞

实用模式

超时控制:

func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
    ch := make(chan []byte, 1)
    errCh := make(chan error, 1)

    go func() {
        resp, err := http.Get(url)
        if err != nil {
            errCh <- err
            return
        }
        defer resp.Body.Close()
        body, _ := io.ReadAll(resp.Body)
        ch <- body
    }()

    select {
    case body := <-ch:
        return body, nil
    case err := <-errCh:
        return nil, err
    case <-time.After(timeout):
        return nil, fmt.Errorf("请求超时 (%v)", timeout)
    }
}

心跳机制:

func worker(done chan<- struct{}) {
    ticker := time.NewTicker(500 * time.Millisecond)
    defer ticker.Stop()

    heartbeat := time.NewTicker(2 * time.Second)
    defer heartbeat.Stop()

    for {
        select {
        case <-ticker.C:
            // 执行工作
            doWork()
        case <-heartbeat.C:
            // 发送心跳
            reportHealth()
        case <-done:
            // 收到停止信号
            return
        }
    }
}

非阻塞操作:

select {
case ch <- value:
    fmt.Println("发送成功")
default:
    fmt.Println("Channel 已满,丢弃")
}

7.5 Context — 取消与超时传播

func fetchUserData(ctx context.Context, userID string) (*User, error) {
    // 创建子 context,5 秒超时
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    // 传递 context 给下游调用
    userCh := make(chan *User, 1)
    errCh := make(chan error, 1)

    go func() {
        user, err := db.QueryUser(ctx, userID)
        if err != nil {
            errCh <- err
            return
        }
        userCh <- user
    }()

    select {
    case user := <-userCh:
        return user, nil
    case err := <-errCh:
        return nil, err
    case <-ctx.Done():
        return nil, ctx.Err() // context.Canceled 或 context.DeadlineExceeded
    }
}

Context 类型

函数用途取消原因
context.Background()根 context永不取消
context.TODO()占位符-
context.WithCancel(parent)手动取消调用 cancel()
context.WithTimeout(parent, d)超时取消超时或调用 cancel()
context.WithDeadline(parent, t)截止时间取消到期或调用 cancel()
context.WithValue(parent, k, v)传递值继承父 context

7.6 GMP 调度模型深度剖析

组件定义

组件全称说明
GGoroutine用户级协程,包含栈、状态、PC
MMachineOS 线程,执行 Go 代码
PProcessor逻辑处理器,持有本地运行队列

调度流程

              全局运行队列
           ┌──────────────┐
           │ G  G  G  G   │
           └──────┬───────┘
                  │
    ┌─────────────┼─────────────┐
    │             │             │
    ▼             ▼             ▼
┌────────┐  ┌────────┐  ┌────────┐
│  P0    │  │  P1    │  │  P2    │
│[G][G]  │  │[G]     │  │[G][G]  │
│本地队列 │  │本地队列 │  │本地队列 │
└───┬────┘  └───┬────┘  └───┬────┘
    │           │           │
    ▼           ▼           ▼
┌────────┐  ┌────────┐  ┌────────┐
│  M0    │  │  M1    │  │  M2    │
│(线程)   │  │(线程)   │  │(线程)   │
└────────┘  └────────┘  └────────┘

调度策略

策略描述
本地优先P 优先从本地队列取 G
窃取机制空闲的 P 从其他 P 的本地队列"偷"一半 G
全局队列定期检查全局队列,防止饥饿
网络轮询G 进入 I/O 时,M 绑定到 netpoller 而非阻塞
异步抢占Go 1.14+,基于信号的抢占式调度

工作窃取(Work Stealing)

P0 空闲了,从 P1 偷一半任务:

Before:
P0: [空]          P1: [G1, G2, G3, G4]

After:
P0: [G3, G4]     P1: [G1, G2]

7.7 业务场景:并发爬虫

func crawl(startURL string, maxDepth int) []string {
    visited := &sync.Map{}
    results := make(chan string, 100)
    var wg sync.WaitGroup

    var crawlFunc func(url string, depth int)
    crawlFunc = func(url string, depth int) {
        defer wg.Done()
        if depth > maxDepth {
            return
        }
        if _, loaded := visited.LoadOrStore(url, true); loaded {
            return // 已访问
        }

        links, err := fetchLinks(url)
        if err != nil {
            return
        }
        results <- url

        for _, link := range links {
            wg.Add(1)
            go crawlFunc(link, depth+1)
        }
    }

    wg.Add(1)
    go crawlFunc(startURL, 0)

    // 等待完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    var urls []string
    for url := range results {
        urls = append(urls, url)
    }
    return urls
}

7.8 本章小结

要点说明
CSP 模型“通过通信共享内存"而非"通过共享内存通信”
goroutine2KB 初始栈,数百万级并发
Channel类型安全的通信管道,支持缓冲
select多路复用,支持超时和非阻塞
Context取消传播和超时控制
GMP 模型N:M 调度,工作窃取,异步抢占

下一章预告:Python 的 asyncio 是如何从生成器演进到原生协程的?


扩展阅读