强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

16 并发编程

第 16 章:并发编程

16.1 线程(Thread)

Nim 的线程通过 createThread 创建,每个线程有独立的内存空间(通过 Isolated 传递数据):

import std/threads

proc worker(id: int) {.thread.} =
  echo &"Worker {id} started"
  sleep(1000)
  echo &"Worker {id} done"

# 创建线程
var t1, t2: Thread[int]
createThread(t1, worker, 1)
createThread(t2, worker, 2)

# 等待线程完成
joinThread(t1)
joinThread(t2)
echo "All workers done"

16.1.1 线程安全

import std/locks

var
  counter = 0
  lock: Lock

initLock(lock)

proc safeIncrement(n: int) {.thread.} =
  for i in 0..<n:
    withLock(lock):
      counter += 1

var threads: array[4, Thread[int]]
for i in 0..3:
  createThread(threads[i], safeIncrement, 10000)

for t in threads:
  joinThread(t)

echo counter  # 40000(正确,因为有锁保护)
deinitLock(lock)

16.2 Channel

Channel 是线程间通信的安全机制:

import std/threads

type
  Message = object
    id: int
    data: string

var channel: Channel[Message]
channel.open(maxItems = 10)

proc producer() {.thread.} =
  for i in 1..5:
    channel.send(Message(id: i, data: &"Message {i}"))
    sleep(100)
  channel.close()

proc consumer() {.thread.} =
  while true:
    let msg = channel.tryRecv()
    if msg.dataAvailable:
      echo &"Received: {msg.data.id} - {msg.data.data}"
    else:
      if channel.closed:
        break
      sleep(50)

var producerThread, consumerThread: Thread[void]
createThread(producerThread, producer)
createThread(consumerThread, consumer)
joinThread(producerThread)
joinThread(consumerThread)

16.3 异步编程

16.3.1 asyncdispatch(标准库)

import std/[asyncdispatch, httpclient, os]

proc fetchUrl(url: string): Future[string] {.async.} =
  let client = newAsyncHttpClient()
  defer: client.close()
  let response = await client.getContent(url)
  return response

proc main() {.async.} =
  let urls = @[
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
  ]
  
  var futures: seq[Future[string]]
  for url in urls:
    futures.add(fetchUrl(url))
  
  let results = await all(futures)
  for i, r in results:
    echo &"URL {i}: {r.len} bytes"

waitFor(main())

16.3.2 chronos(高性能异步框架)

# 需要安装: nimble install chronos
import chronos

proc asyncWork(id: int): Future[string] {.async.} =
  await sleepAsync(1000.milliseconds)
  return &"Result from worker {id}"

proc main() {.async.} =
  var futures: seq[Future[string]]
  for i in 1..5:
    futures.add(asyncWork(i))
  
  let results = await all(futures)
  for r in results:
    echo r

waitFor(main())

16.4 并行计算

import std/threadpool

proc heavyCompute(n: int): int =
  result = 0
  for i in 1..n:
    result += i

# 并行执行
let results = parallel:
  let a = spawn heavyCompute(1000000)
  let b = spawn heavyCompute(2000000)
  let c = spawn heavyCompute(3000000)
  (^a, ^b, ^c)

echo &"Results: {results[0]}, {results[1]}, {results[2]}"

16.5 原子操作

import std/atomics

var counter: Atomic[int]
counter.store(0)

proc increment(n: int) {.thread.} =
  for i in 0..<n:
    counter.atomicInc()

var threads: array[4, Thread[int]]
for i in 0..3:
  createThread(threads[i], increment, 100000)

for t in threads:
  joinThread(t)

echo counter.load()  # 400000

16.6 实战示例

🏢 场景:并发 Web 爬虫

import std/[asyncdispatch, httpclient, strutils, os, re]

type Crawler = object
  visited: seq[string]
  maxConcurrent: int

proc fetchPage(url: string): Future[string] {.async.} =
  try:
    let client = newAsyncHttpClient()
    defer: client.close()
    return await client.getContent(url)
  except:
    return ""

proc extractLinks(html: string): seq[string] =
  result = @[]
  let regex = re"""href="(https?://[^"]+)""""
  for match in html.findAll(regex):
    let url = match[6..^2]  # 提取 URL
    result.add(url)

proc crawl(url: string, depth: int): Future[seq[string]] {.async.} =
  if depth <= 0:
    return @[]
  
  let html = await fetchPage(url)
  if html.len == 0:
    return @[]
  
  result = @[url]
  let links = extractLinks(html)
  
  var futures: seq[Future[seq[string]]]
  for link in links[0..<min(links.len, 5)]:
    futures.add(crawl(link, depth - 1))
  
  let results = await all(futures)
  for r in results:
    result.add(r)

proc main() {.async.} =
  let urls = await crawl("https://example.com", 2)
  echo &"Found {urls.len} URLs"
  for url in urls[0..<min(urls.len, 10)]:
    echo url

waitFor(main())

🏢 场景:工作队列

import std/[threads, deques, locks, os]

type
  Task = object
    id: int
    data: string
  
  WorkQueue = object
    tasks: Deque[Task]
    results: seq[string]
    lock: Lock
    resultLock: Lock

proc newWorkQueue(): WorkQueue =
  result.tasks = initDeque[Task]()
  result.results = @[]
  initLock(result.lock)
  initLock(result.resultLock)

proc addTask(wq: var WorkQueue, task: Task) =
  withLock(wq.lock):
    wq.tasks.addLast(task)

proc worker(wq: ptr WorkQueue) {.thread.} =
  while true:
    var task: Task
    var hasTask = false
    
    withLock(wq[].lock):
      if wq[].tasks.len > 0:
        task = wq[].tasks.popFirst()
        hasTask = true
    
    if not hasTask:
      sleep(10)
      continue
    
    # 处理任务
    let result = &"Processed: {task.id} - {task.data}"
    withLock(wq[].resultLock):
      wq[].results.add(result)
    sleep(100)

var wq = newWorkQueue()

# 添加任务
for i in 1..20:
  wq.addTask(Task(id: i, data: &"Task data {i}"))

# 启动工作线程
var workers: array[4, Thread[ptr WorkQueue]]
for i in 0..3:
  createThread(workers[i], worker, addr wq)

for w in workers:
  joinThread(w)

echo &"Processed {wq.results.len} tasks"
for r in wq.results[0..<min(wq.results.len, 5)]:
  echo r

本章小结

机制用途特点
ThreadOS 线程最基础,手动管理
Channel线程间通信安全的消息传递
async/await异步 I/O单线程,高并发
parallel/spawn并行计算计算密集型
Lock互斥锁保护共享数据
Atomic原子操作无锁同步

练习

  1. 使用 Channel 实现生产者-消费者模式
  2. 用 async/await 实现并发 HTTP 请求
  3. 实现一个线程安全的缓存

扩展阅读


上一章:文件与 I/O | 下一章:外部函数接口