14 - 并发基础:goroutine、Channel、select、sync 包
14 - 并发基础
14.1 并发 vs 并行
| 概念 | 说明 | Go 实现 |
|---|---|---|
| 并发(Concurrency) | 同一时间段内处理多个任务 | goroutine |
| 并行(Parallelism) | 同一时刻执行多个任务 | GOMAXPROCS 多核 |
“并发不是并行,但并发使并行成为可能。” — Rob Pike
14.2 goroutine
goroutine 是 Go 的轻量级线程,由 Go 运行时管理,初始栈仅约 2KB。
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("[%s] Hello %d\n", name, i)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// 启动 goroutine
go sayHello("goroutine-1")
go sayHello("goroutine-2")
go sayHello("goroutine-3")
// 匿名函数 goroutine
go func() {
fmt.Println("匿名 goroutine 执行")
}()
// 带参数的匿名函数
go func(name string) {
fmt.Printf("Hello, %s!\n", name)
}("World")
// 等待 goroutine 完成(不推荐用 Sleep)
time.Sleep(500 * time.Millisecond)
fmt.Println("main 结束")
}
goroutine 的特点
| 特性 | 说明 |
|---|---|
| 初始栈大小 | ~2KB(可动态增长到 1GB) |
| 创建开销 | 极低(vs 线程 ~1MB) |
| 调度方式 | 协作式 + 抢占式(Go 调度器) |
| 数量限制 | 取决于内存(可轻松创建百万个) |
| 生命周期 | 从创建到函数返回或 main 结束 |
WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成时通知 WaitGroup
fmt.Printf("Worker %d 开始\n", id)
time.Sleep(time.Duration(id*100) * time.Millisecond)
fmt.Printf("Worker %d 完成\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 增加计数
go worker(i, &wg)
}
wg.Wait() // 等待所有 worker 完成
fmt.Println("所有 worker 完成")
}
14.3 Channel
Channel 是 goroutine 之间的通信管道,遵循 CSP(Communicating Sequential Processes)模型。
“不要通过共享内存来通信,而要通过通信来共享内存。”
无缓冲 Channel
package main
import "fmt"
func main() {
// 创建无缓冲 channel
ch := make(chan string)
// 发送方
go func() {
ch <- "Hello from goroutine"
}()
// 接收方(阻塞直到有数据)
msg := <-ch
fmt.Println(msg)
}
有缓冲 Channel
func main() {
// 创建容量为 3 的缓冲 channel
ch := make(chan int, 3)
// 发送(不会阻塞,因为有空间)
ch <- 1
ch <- 2
ch <- 3
// ch <- 4 // 阻塞!缓冲区已满
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
}
Channel 方向
// 双向 channel
ch := make(chan int)
// 只写 channel(用于函数参数限制)
func producer(ch chan<- int) {
ch <- 42
close(ch)
}
// 只读 channel
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}
关闭 Channel
func main() {
ch := make(chan int, 5)
// 发送方
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送完毕后关闭
}()
// 接收方:range 会自动检测关闭
for v := range ch {
fmt.Println(v)
}
// 接收方:手动检查
ch2 := make(chan int, 3)
ch2 <- 10
ch2 <- 20
close(ch2)
// 接收已关闭 channel 的值
v1, ok1 := <-ch2 // 10, true
v2, ok2 := <-ch2 // 20, true
v3, ok3 := <-ch2 // 0, false(channel 已关闭且无数据)
fmt.Println(v1, ok1, v2, ok2, v3, ok3)
}
⚠️ 注意:
- 只有发送方应该关闭 channel
- 关闭已关闭的 channel 会 panic
- 向已关闭的 channel 发送数据会 panic
- 从已关闭的 channel 接收数据返回零值
14.4 Channel 模式
生产者-消费者
package main
import (
"fmt"
"sync"
)
func producer(id int, ch chan<- int, count int) {
for i := 0; i < count; i++ {
ch <- id*100 + i
}
}
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range ch {
fmt.Printf("Consumer %d 处理: %d\n", id, v)
}
}
func main() {
ch := make(chan int, 10)
var wg sync.WaitGroup
// 3 个生产者
for i := 1; i <= 3; i++ {
go producer(i, ch, 5)
}
// 2 个消费者
for i := 1; i <= 2; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}
// 等待所有生产者完成后关闭 channel
// 注意:实际生产中需要更优雅的关闭机制
go func() {
// 简化处理:sleep 后关闭
// 实际应使用 WaitGroup 追踪生产者
for {
if len(ch) == 0 {
close(ch)
return
}
}
}()
wg.Wait()
}
管道(Pipeline)
package main
import "fmt"
// 阶段 1:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段 2:处理数据(平方)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段 3:过滤数据
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if predicate(n) {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// 组合管道
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squares := square(nums)
evens := filter(squares, func(n int) bool { return n%2 == 0 })
for v := range evens {
fmt.Println(v) // 4, 16, 36, 64, 100
}
}
扇入/扇出(Fan-in/Fan-out)
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
超时和退出
func main() {
ch := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
// 使用 time.After 实现超时
select {
case result := <-ch:
fmt.Println("收到:", result)
case <-time.After(1 * time.Second):
fmt.Println("超时!")
}
}
14.5 GOMAXPROCS
import "runtime"
func main() {
// 查看当前 GOMAXPROCS
fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))
// 设置 GOMAXPROCS(默认为 CPU 核心数)
runtime.GOMAXPROCS(4)
// 查看 CPU 核心数
fmt.Println("CPU:", runtime.NumCPU())
}
14.6 并发陷阱
// ❌ 陷阱 1:数据竞争
func raceCondition() {
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 数据竞争!
}()
}
wg.Wait()
fmt.Println(counter) // 结果不确定
}
// ✅ 正确:使用 Channel 或 Mutex
func safeCounter() {
counter := 0
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(counter) // 1000
}
// ❌ 陷阱 2:goroutine 泄漏
func leakyFunction() <-chan int {
ch := make(chan int)
go func() {
// 如果没有人读取 ch,这个 goroutine 永远不会退出
ch <- 42
}()
return ch
}
// ✅ 正确:使用 context 取消
func nonLeaky(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
select {
case ch <- 42:
case <-ctx.Done():
return
}
}()
return ch
}
🏢 业务场景
- 并发下载器:多个 goroutine 并发下载文件
- 日志收集器:channel 实现生产者-消费者日志管道
- 任务调度器:WaitGroup + channel 管理并发任务
- 超时控制:select + time.After 控制请求超时
- 事件系统:channel 实现发布-订阅模式