强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

15 - 同步原语:Mutex、RWMutex、WaitGroup、Once、Pool

15 - 同步原语

15.1 sync.Mutex(互斥锁)

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := &SafeCounter{}
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Println("计数:", counter.Value()) // 1000
}

⚠️ 注意

  • sync.Mutex 不可复制(复制后锁状态不一致)
  • 永远用 defer mu.Unlock() 确保解锁
  • 不要在持锁时调用可能阻塞的函数

15.2 sync.RWMutex(读写锁)

读写锁允许多个读操作并发,但写操作互斥。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func NewCache() *Cache {
    return &Cache{data: make(map[string]string)}
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()         // 读锁
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // 写锁
    defer c.mu.Unlock()
    c.data[key] = value
}

func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.data, key)
}

func (c *Cache) Len() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return len(c.data)
}

func main() {
    cache := NewCache()
    var wg sync.WaitGroup

    // 并发写入
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            cache.Set(fmt.Sprintf("key-%d", i), fmt.Sprintf("value-%d", i))
        }(i)
    }

    // 并发读取
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            if val, ok := cache.Get(fmt.Sprintf("key-%d", i)); ok {
                _ = val
            }
        }(i)
    }

    wg.Wait()
    fmt.Println("缓存大小:", cache.Len())
}
锁类型读操作写操作适用场景
Mutex互斥互斥通用
RWMutex并发互斥读多写少

15.3 sync.WaitGroup

import (
    "fmt"
    "sync"
    "time"
)

func processTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("任务 %d 开始\n", id)
    time.Sleep(time.Duration(id*100) * time.Millisecond)
    fmt.Printf("任务 %d 完成\n", id)
}

func main() {
    var wg sync.WaitGroup

    tasks := 5
    wg.Add(tasks) // 一次添加多个

    for i := 1; i <= tasks; i++ {
        go processTask(i, &wg)
    }

    wg.Wait()
    fmt.Println("所有任务完成")
}

💡 技巧wg.Add() 应在启动 goroutine 之前调用,避免竞态条件。

15.4 sync.Once

确保函数只执行一次(如单例、初始化)。

package main

import (
    "fmt"
    "sync"
)

type Database struct {
    Host string
    Port int
}

var (
    dbInstance *Database
    dbOnce     sync.Once
)

func GetDatabase() *Database {
    dbOnce.Do(func() {
        fmt.Println("初始化数据库连接(只执行一次)")
        dbInstance = &Database{
            Host: "localhost",
            Port: 5432,
        }
    })
    return dbInstance
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDatabase()
            _ = db
        }()
    }
    wg.Wait()
    // 输出: "初始化数据库连接(只执行一次)" 只出现一次
}

15.5 sync.Pool

对象池,减少频繁的内存分配和 GC 压力。

package main

import (
    "bytes"
    "fmt"
    "sync"
)

// 创建 buffer 池
var bufferPool = sync.Pool{
    New: func() any {
        return new(bytes.Buffer)
    },
}

func processRequest(data string) string {
    // 从池中获取
    buf := bufferPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()           // 清空
        bufferPool.Put(buf)   // 归还
    }()

    buf.WriteString("处理: ")
    buf.WriteString(data)
    return buf.String()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            result := processRequest(fmt.Sprintf("请求-%d", i))
            _ = result
        }(i)
    }
    wg.Wait()
    fmt.Println("完成")
}
特性说明
线程安全✅ 可在多 goroutine 中使用
自动清理GC 时可能清除池中对象
适用场景频繁创建和销毁的临时对象
不适用需要持久化的对象

15.6 sync.Map

// 第九章已详细介绍,此处简要对比
var m sync.Map

// 存储
m.Store("key", "value")

// 读取
v, ok := m.Load("key")

// 原子操作
v, loaded := m.LoadOrStore("key", "default")

15.7 sync.Cond

条件变量,用于 goroutine 间的等待/通知。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    items []int
    mu    sync.Mutex
    cond  *sync.Cond
}

func NewQueue() *Queue {
    q := &Queue{}
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Put(item int) {
    q.mu.Lock()
    defer q.mu.Unlock()
    q.items = append(q.items, item)
    q.cond.Signal() // 通知一个等待者
}

func (q *Queue) Get() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    for len(q.items) == 0 {
        q.cond.Wait() // 等待通知
    }
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    q := NewQueue()

    // 消费者
    go func() {
        for i := 0; i < 5; i++ {
            item := q.Get()
            fmt.Printf("消费: %d\n", item)
        }
    }()

    // 生产者
    for i := 1; i <= 5; i++ {
        time.Sleep(200 * time.Millisecond)
        fmt.Printf("生产: %d\n", i)
        q.Put(i)
    }

    time.Sleep(time.Second)
}

15.8 sync/atomic

原子操作,比锁更高效的并发安全操作。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    // 原子计数器
    var counter int64
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    wg.Wait()
    fmt.Println("计数:", atomic.LoadInt64(&counter)) // 1000

    // 原子比较并交换(CAS)
    var value int64 = 100
    swapped := atomic.CompareAndSwapInt64(&value, 100, 200)
    fmt.Println(swapped, value) // true, 200

    // 原子值
    var config atomic.Value
    config.Store(map[string]string{"env": "dev"})
    
    go func() {
        time.Sleep(time.Second)
        config.Store(map[string]string{"env": "prod"})
    }()

    v := config.Load().(map[string]string)
    fmt.Println(v["env"])
}
函数说明
AddInt64原子加
LoadInt64原子读
StoreInt64原子写
CompareAndSwapInt64CAS 操作
SwapInt64原子交换
atomic.Value原子存储任意类型

🏢 业务场景

  1. 计数器/统计atomic 实现高性能请求计数
  2. 连接池sync.Pool 复用 HTTP 连接/Buffer
  3. 缓存RWMutex 保护读多写少的缓存
  4. 配置热更新atomic.Value 无锁读取配置
  5. 初始化sync.Once 确保单例只初始化一次

📖 扩展阅读