强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

14 - 并发基础:goroutine、Channel、select、sync 包

14 - 并发基础

14.1 并发 vs 并行

概念说明Go 实现
并发(Concurrency)同一时间段内处理多个任务goroutine
并行(Parallelism)同一时刻执行多个任务GOMAXPROCS 多核

“并发不是并行,但并发使并行成为可能。” — Rob Pike

14.2 goroutine

goroutine 是 Go 的轻量级线程,由 Go 运行时管理,初始栈仅约 2KB。

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("[%s] Hello %d\n", name, i)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // 启动 goroutine
    go sayHello("goroutine-1")
    go sayHello("goroutine-2")
    go sayHello("goroutine-3")

    // 匿名函数 goroutine
    go func() {
        fmt.Println("匿名 goroutine 执行")
    }()

    // 带参数的匿名函数
    go func(name string) {
        fmt.Printf("Hello, %s!\n", name)
    }("World")

    // 等待 goroutine 完成(不推荐用 Sleep)
    time.Sleep(500 * time.Millisecond)
    fmt.Println("main 结束")
}

goroutine 的特点

特性说明
初始栈大小~2KB(可动态增长到 1GB)
创建开销极低(vs 线程 ~1MB)
调度方式协作式 + 抢占式(Go 调度器)
数量限制取决于内存(可轻松创建百万个)
生命周期从创建到函数返回或 main 结束

WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成时通知 WaitGroup

    fmt.Printf("Worker %d 开始\n", id)
    time.Sleep(time.Duration(id*100) * time.Millisecond)
    fmt.Printf("Worker %d 完成\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1) // 增加计数
        go worker(i, &wg)
    }

    wg.Wait() // 等待所有 worker 完成
    fmt.Println("所有 worker 完成")
}

14.3 Channel

Channel 是 goroutine 之间的通信管道,遵循 CSP(Communicating Sequential Processes)模型。

“不要通过共享内存来通信,而要通过通信来共享内存。”

无缓冲 Channel

package main

import "fmt"

func main() {
    // 创建无缓冲 channel
    ch := make(chan string)

    // 发送方
    go func() {
        ch <- "Hello from goroutine"
    }()

    // 接收方(阻塞直到有数据)
    msg := <-ch
    fmt.Println(msg)
}

有缓冲 Channel

func main() {
    // 创建容量为 3 的缓冲 channel
    ch := make(chan int, 3)

    // 发送(不会阻塞,因为有空间)
    ch <- 1
    ch <- 2
    ch <- 3
    // ch <- 4 // 阻塞!缓冲区已满

    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

Channel 方向

// 双向 channel
ch := make(chan int)

// 只写 channel(用于函数参数限制)
func producer(ch chan<- int) {
    ch <- 42
    close(ch)
}

// 只读 channel
func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Println(v)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}

关闭 Channel

func main() {
    ch := make(chan int, 5)

    // 发送方
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
        }
        close(ch) // 发送完毕后关闭
    }()

    // 接收方:range 会自动检测关闭
    for v := range ch {
        fmt.Println(v)
    }

    // 接收方:手动检查
    ch2 := make(chan int, 3)
    ch2 <- 10
    ch2 <- 20
    close(ch2)

    // 接收已关闭 channel 的值
    v1, ok1 := <-ch2 // 10, true
    v2, ok2 := <-ch2 // 20, true
    v3, ok3 := <-ch2 // 0, false(channel 已关闭且无数据)
    fmt.Println(v1, ok1, v2, ok2, v3, ok3)
}

⚠️ 注意

  • 只有发送方应该关闭 channel
  • 关闭已关闭的 channel 会 panic
  • 向已关闭的 channel 发送数据会 panic
  • 从已关闭的 channel 接收数据返回零值

14.4 Channel 模式

生产者-消费者

package main

import (
    "fmt"
    "sync"
)

func producer(id int, ch chan<- int, count int) {
    for i := 0; i < count; i++ {
        ch <- id*100 + i
    }
}

func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for v := range ch {
        fmt.Printf("Consumer %d 处理: %d\n", id, v)
    }
}

func main() {
    ch := make(chan int, 10)
    var wg sync.WaitGroup

    // 3 个生产者
    for i := 1; i <= 3; i++ {
        go producer(i, ch, 5)
    }

    // 2 个消费者
    for i := 1; i <= 2; i++ {
        wg.Add(1)
        go consumer(i, ch, &wg)
    }

    // 等待所有生产者完成后关闭 channel
    // 注意:实际生产中需要更优雅的关闭机制
    go func() {
        // 简化处理:sleep 后关闭
        // 实际应使用 WaitGroup 追踪生产者
        for {
            if len(ch) == 0 {
                close(ch)
                return
            }
        }
    }()

    wg.Wait()
}

管道(Pipeline)

package main

import "fmt"

// 阶段 1:生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段 2:处理数据(平方)
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 阶段 3:过滤数据
func filter(in <-chan int, predicate func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if predicate(n) {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // 组合管道
    nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squares := square(nums)
    evens := filter(squares, func(n int) bool { return n%2 == 0 })

    for v := range evens {
        fmt.Println(v) // 4, 16, 36, 64, 100
    }
}

扇入/扇出(Fan-in/Fan-out)

func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

超时和退出

func main() {
    ch := make(chan string, 1)

    go func() {
        time.Sleep(2 * time.Second)
        ch <- "result"
    }()

    // 使用 time.After 实现超时
    select {
    case result := <-ch:
        fmt.Println("收到:", result)
    case <-time.After(1 * time.Second):
        fmt.Println("超时!")
    }
}

14.5 GOMAXPROCS

import "runtime"

func main() {
    // 查看当前 GOMAXPROCS
    fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))

    // 设置 GOMAXPROCS(默认为 CPU 核心数)
    runtime.GOMAXPROCS(4)

    // 查看 CPU 核心数
    fmt.Println("CPU:", runtime.NumCPU())
}

14.6 并发陷阱

// ❌ 陷阱 1:数据竞争
func raceCondition() {
    counter := 0
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // 数据竞争!
        }()
    }
    wg.Wait()
    fmt.Println(counter) // 结果不确定
}

// ✅ 正确:使用 Channel 或 Mutex
func safeCounter() {
    counter := 0
    var mu sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    wg.Wait()
    fmt.Println(counter) // 1000
}

// ❌ 陷阱 2:goroutine 泄漏
func leakyFunction() <-chan int {
    ch := make(chan int)
    go func() {
        // 如果没有人读取 ch,这个 goroutine 永远不会退出
        ch <- 42
    }()
    return ch
}

// ✅ 正确:使用 context 取消
func nonLeaky(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        select {
        case ch <- 42:
        case <-ctx.Done():
            return
        }
    }()
    return ch
}

🏢 业务场景

  1. 并发下载器:多个 goroutine 并发下载文件
  2. 日志收集器:channel 实现生产者-消费者日志管道
  3. 任务调度器:WaitGroup + channel 管理并发任务
  4. 超时控制:select + time.After 控制请求超时
  5. 事件系统:channel 实现发布-订阅模式

📖 扩展阅读