第7章:Go 协程 —— CSP 模型的典范
第7章:Go 协程 —— CSP 模型的典范
7.1 Go 的并发哲学
Go 语言的并发模型基于 CSP(Communicating Sequential Processes,通信顺序进程),由 Tony Hoare 在 1978 年提出。
Go 的格言:“Don’t communicate by sharing memory; share memory by communicating.” (不要通过共享内存来通信,而要通过通信来共享内存。)
CSP vs Actor 模型
| 特性 | CSP (Go) | Actor (Erlang) |
|---|---|---|
| 通信方式 | Channel(通道) | 消息传递(Mailbox) |
| 发送者 | 必须知道 Channel | 直接发送到 PID |
| 接收者 | 从 Channel 读取 | 从 Mailbox 读取 |
| 耦合度 | 较低(通过 Channel 解耦) | 较低(通过 PID 解耦) |
| 选择机制 | select 语句 | Pattern Matching |
7.2 Goroutine 基础
创建 Goroutine
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("[%s] 你好 %d\n", name, i)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// 使用 go 关键字启动 goroutine
go sayHello("协程A")
go sayHello("协程B")
go func() {
fmt.Println("[匿名协程] 执行")
}()
time.Sleep(1 * time.Second) // 等待所有 goroutine 完成
}
Goroutine vs OS 线程
| 特性 | Goroutine | OS 线程 |
|---|---|---|
| 初始栈大小 | 2KB(可动态增长) | 2MB(固定) |
| 创建开销 | ~0.3μs | ~30μs |
| 切换开销 | ~100ns | ~1-10μs |
| 最大数量 | 数百万 | 数千 |
| 调度 | Go 运行时(GMP) | OS 内核 |
WaitGroup — 等待所有 Goroutine
import "sync"
func processItems(items []int) {
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1) // 重要:在 goroutine 外面 Add
go func(item int) {
defer wg.Done()
process(item)
}(item) // 重要:传入参数,避免闭包陷阱
}
wg.Wait() // 阻塞直到所有 goroutine 完成
}
闭包陷阱
// ❌ 常见错误:循环变量捕获
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // 可能全部输出 5
}()
}
// ✅ 正确:通过参数传递
for i := 0; i < 5; i++ {
go func(n int) {
fmt.Println(n)
}(i)
}
// ✅ Go 1.22+:循环变量每次迭代都是新变量
// 不再需要额外处理
7.3 Channel — Go 的通信管道
基本用法
// 无缓冲 Channel(同步通信)
ch := make(chan int)
// 带缓冲 Channel(异步通信)
ch := make(chan int, 10)
// 发送
ch <- 42
// 接收
value := <-ch
// 关闭 Channel
close(ch)
Channel 类型对比
| 类型 | 创建方式 | 发送行为 | 接收行为 | 用途 |
|---|---|---|---|---|
| 无缓冲 | make(chan T) | 阻塞直到有接收者 | 阻塞直到有发送者 | 同步协调 |
| 有缓冲 | make(chan T, n) | 阻塞直到缓冲区满 | 阻塞直到缓冲区有数据 | 异步解耦 |
| 只发送 | chan<- T | - | - | 类型安全 |
| 只接收 | <-chan T | - | - | 类型安全 |
Channel 使用模式
生产者-消费者:
func producer(ch chan<- int) {
for i := 0; i < 100; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for v := range ch {
fmt.Printf("处理: %d\n", v)
}
// range 会在 Channel 关闭后自动退出
}
func main() {
ch := make(chan int, 10)
go producer(ch)
consumer(ch) // 在主 goroutine 中消费
}
Fan-out Fan-in(扇出扇入):
func fanOut(input <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(input)
}
return channels
}
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
7.4 Select 语句
select 语句用于同时等待多个 Channel 操作。
基本用法
select {
case msg := <-ch1:
fmt.Println("从 ch1 收到:", msg)
case msg := <-ch2:
fmt.Println("从 ch2 收到:", msg)
case <-time.After(3 * time.Second):
fmt.Println("超时")
default:
fmt.Println("没有就绪的 Channel")
}
Select 的行为规则
| 规则 | 说明 |
|---|---|
| 随机选择 | 多个 case 同时就绪时,随机选择一个执行 |
| 阻塞等待 | 如果没有 default 且没有 case 就绪,select 阻塞 |
| 非阻塞 | 有 default 时,没有 case 就绪就执行 default |
| 空 select | select{} 永远阻塞 |
实用模式
超时控制:
func fetchWithTimeout(url string, timeout time.Duration) ([]byte, error) {
ch := make(chan []byte, 1)
errCh := make(chan error, 1)
go func() {
resp, err := http.Get(url)
if err != nil {
errCh <- err
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
ch <- body
}()
select {
case body := <-ch:
return body, nil
case err := <-errCh:
return nil, err
case <-time.After(timeout):
return nil, fmt.Errorf("请求超时 (%v)", timeout)
}
}
心跳机制:
func worker(done chan<- struct{}) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
heartbeat := time.NewTicker(2 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-ticker.C:
// 执行工作
doWork()
case <-heartbeat.C:
// 发送心跳
reportHealth()
case <-done:
// 收到停止信号
return
}
}
}
非阻塞操作:
select {
case ch <- value:
fmt.Println("发送成功")
default:
fmt.Println("Channel 已满,丢弃")
}
7.5 Context — 取消与超时传播
func fetchUserData(ctx context.Context, userID string) (*User, error) {
// 创建子 context,5 秒超时
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// 传递 context 给下游调用
userCh := make(chan *User, 1)
errCh := make(chan error, 1)
go func() {
user, err := db.QueryUser(ctx, userID)
if err != nil {
errCh <- err
return
}
userCh <- user
}()
select {
case user := <-userCh:
return user, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err() // context.Canceled 或 context.DeadlineExceeded
}
}
Context 类型
| 函数 | 用途 | 取消原因 |
|---|---|---|
context.Background() | 根 context | 永不取消 |
context.TODO() | 占位符 | - |
context.WithCancel(parent) | 手动取消 | 调用 cancel() |
context.WithTimeout(parent, d) | 超时取消 | 超时或调用 cancel() |
context.WithDeadline(parent, t) | 截止时间取消 | 到期或调用 cancel() |
context.WithValue(parent, k, v) | 传递值 | 继承父 context |
7.6 GMP 调度模型深度剖析
组件定义
| 组件 | 全称 | 说明 |
|---|---|---|
| G | Goroutine | 用户级协程,包含栈、状态、PC |
| M | Machine | OS 线程,执行 Go 代码 |
| P | Processor | 逻辑处理器,持有本地运行队列 |
调度流程
全局运行队列
┌──────────────┐
│ G G G G │
└──────┬───────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ P0 │ │ P1 │ │ P2 │
│[G][G] │ │[G] │ │[G][G] │
│本地队列 │ │本地队列 │ │本地队列 │
└───┬────┘ └───┬────┘ └───┬────┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ M0 │ │ M1 │ │ M2 │
│(线程) │ │(线程) │ │(线程) │
└────────┘ └────────┘ └────────┘
调度策略
| 策略 | 描述 |
|---|---|
| 本地优先 | P 优先从本地队列取 G |
| 窃取机制 | 空闲的 P 从其他 P 的本地队列"偷"一半 G |
| 全局队列 | 定期检查全局队列,防止饥饿 |
| 网络轮询 | G 进入 I/O 时,M 绑定到 netpoller 而非阻塞 |
| 异步抢占 | Go 1.14+,基于信号的抢占式调度 |
工作窃取(Work Stealing)
P0 空闲了,从 P1 偷一半任务:
Before:
P0: [空] P1: [G1, G2, G3, G4]
After:
P0: [G3, G4] P1: [G1, G2]
7.7 业务场景:并发爬虫
func crawl(startURL string, maxDepth int) []string {
visited := &sync.Map{}
results := make(chan string, 100)
var wg sync.WaitGroup
var crawlFunc func(url string, depth int)
crawlFunc = func(url string, depth int) {
defer wg.Done()
if depth > maxDepth {
return
}
if _, loaded := visited.LoadOrStore(url, true); loaded {
return // 已访问
}
links, err := fetchLinks(url)
if err != nil {
return
}
results <- url
for _, link := range links {
wg.Add(1)
go crawlFunc(link, depth+1)
}
}
wg.Add(1)
go crawlFunc(startURL, 0)
// 等待完成后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 收集结果
var urls []string
for url := range results {
urls = append(urls, url)
}
return urls
}
7.8 本章小结
| 要点 | 说明 |
|---|---|
| CSP 模型 | “通过通信共享内存"而非"通过共享内存通信” |
| goroutine | 2KB 初始栈,数百万级并发 |
| Channel | 类型安全的通信管道,支持缓冲 |
| select | 多路复用,支持超时和非阻塞 |
| Context | 取消传播和超时控制 |
| GMP 模型 | N:M 调度,工作窃取,异步抢占 |
下一章预告:Python 的 asyncio 是如何从生成器演进到原生协程的?
扩展阅读
- Go 官方文档 — Goroutines
- Go 官方文档 — Channels
- Scheduling In Go — William Kennedy
- Go scheduler — Daniel Morsing
- Communicating Sequential Processes — Tony Hoare