16 并发编程
第 16 章:并发编程
16.1 线程(Thread)
Nim 的线程通过 createThread 创建,每个线程有独立的内存空间(通过 Isolated 传递数据):
import std/threads
proc worker(id: int) {.thread.} =
echo &"Worker {id} started"
sleep(1000)
echo &"Worker {id} done"
# 创建线程
var t1, t2: Thread[int]
createThread(t1, worker, 1)
createThread(t2, worker, 2)
# 等待线程完成
joinThread(t1)
joinThread(t2)
echo "All workers done"
16.1.1 线程安全
import std/locks
var
counter = 0
lock: Lock
initLock(lock)
proc safeIncrement(n: int) {.thread.} =
for i in 0..<n:
withLock(lock):
counter += 1
var threads: array[4, Thread[int]]
for i in 0..3:
createThread(threads[i], safeIncrement, 10000)
for t in threads:
joinThread(t)
echo counter # 40000(正确,因为有锁保护)
deinitLock(lock)
16.2 Channel
Channel 是线程间通信的安全机制:
import std/threads
type
Message = object
id: int
data: string
var channel: Channel[Message]
channel.open(maxItems = 10)
proc producer() {.thread.} =
for i in 1..5:
channel.send(Message(id: i, data: &"Message {i}"))
sleep(100)
channel.close()
proc consumer() {.thread.} =
while true:
let msg = channel.tryRecv()
if msg.dataAvailable:
echo &"Received: {msg.data.id} - {msg.data.data}"
else:
if channel.closed:
break
sleep(50)
var producerThread, consumerThread: Thread[void]
createThread(producerThread, producer)
createThread(consumerThread, consumer)
joinThread(producerThread)
joinThread(consumerThread)
16.3 异步编程
16.3.1 asyncdispatch(标准库)
import std/[asyncdispatch, httpclient, os]
proc fetchUrl(url: string): Future[string] {.async.} =
let client = newAsyncHttpClient()
defer: client.close()
let response = await client.getContent(url)
return response
proc main() {.async.} =
let urls = @[
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
]
var futures: seq[Future[string]]
for url in urls:
futures.add(fetchUrl(url))
let results = await all(futures)
for i, r in results:
echo &"URL {i}: {r.len} bytes"
waitFor(main())
16.3.2 chronos(高性能异步框架)
# 需要安装: nimble install chronos
import chronos
proc asyncWork(id: int): Future[string] {.async.} =
await sleepAsync(1000.milliseconds)
return &"Result from worker {id}"
proc main() {.async.} =
var futures: seq[Future[string]]
for i in 1..5:
futures.add(asyncWork(i))
let results = await all(futures)
for r in results:
echo r
waitFor(main())
16.4 并行计算
import std/threadpool
proc heavyCompute(n: int): int =
result = 0
for i in 1..n:
result += i
# 并行执行
let results = parallel:
let a = spawn heavyCompute(1000000)
let b = spawn heavyCompute(2000000)
let c = spawn heavyCompute(3000000)
(^a, ^b, ^c)
echo &"Results: {results[0]}, {results[1]}, {results[2]}"
16.5 原子操作
import std/atomics
var counter: Atomic[int]
counter.store(0)
proc increment(n: int) {.thread.} =
for i in 0..<n:
counter.atomicInc()
var threads: array[4, Thread[int]]
for i in 0..3:
createThread(threads[i], increment, 100000)
for t in threads:
joinThread(t)
echo counter.load() # 400000
16.6 实战示例
🏢 场景:并发 Web 爬虫
import std/[asyncdispatch, httpclient, strutils, os, re]
type Crawler = object
visited: seq[string]
maxConcurrent: int
proc fetchPage(url: string): Future[string] {.async.} =
try:
let client = newAsyncHttpClient()
defer: client.close()
return await client.getContent(url)
except:
return ""
proc extractLinks(html: string): seq[string] =
result = @[]
let regex = re"""href="(https?://[^"]+)""""
for match in html.findAll(regex):
let url = match[6..^2] # 提取 URL
result.add(url)
proc crawl(url: string, depth: int): Future[seq[string]] {.async.} =
if depth <= 0:
return @[]
let html = await fetchPage(url)
if html.len == 0:
return @[]
result = @[url]
let links = extractLinks(html)
var futures: seq[Future[seq[string]]]
for link in links[0..<min(links.len, 5)]:
futures.add(crawl(link, depth - 1))
let results = await all(futures)
for r in results:
result.add(r)
proc main() {.async.} =
let urls = await crawl("https://example.com", 2)
echo &"Found {urls.len} URLs"
for url in urls[0..<min(urls.len, 10)]:
echo url
waitFor(main())
🏢 场景:工作队列
import std/[threads, deques, locks, os]
type
Task = object
id: int
data: string
WorkQueue = object
tasks: Deque[Task]
results: seq[string]
lock: Lock
resultLock: Lock
proc newWorkQueue(): WorkQueue =
result.tasks = initDeque[Task]()
result.results = @[]
initLock(result.lock)
initLock(result.resultLock)
proc addTask(wq: var WorkQueue, task: Task) =
withLock(wq.lock):
wq.tasks.addLast(task)
proc worker(wq: ptr WorkQueue) {.thread.} =
while true:
var task: Task
var hasTask = false
withLock(wq[].lock):
if wq[].tasks.len > 0:
task = wq[].tasks.popFirst()
hasTask = true
if not hasTask:
sleep(10)
continue
# 处理任务
let result = &"Processed: {task.id} - {task.data}"
withLock(wq[].resultLock):
wq[].results.add(result)
sleep(100)
var wq = newWorkQueue()
# 添加任务
for i in 1..20:
wq.addTask(Task(id: i, data: &"Task data {i}"))
# 启动工作线程
var workers: array[4, Thread[ptr WorkQueue]]
for i in 0..3:
createThread(workers[i], worker, addr wq)
for w in workers:
joinThread(w)
echo &"Processed {wq.results.len} tasks"
for r in wq.results[0..<min(wq.results.len, 5)]:
echo r
本章小结
| 机制 | 用途 | 特点 |
|---|---|---|
| Thread | OS 线程 | 最基础,手动管理 |
| Channel | 线程间通信 | 安全的消息传递 |
| async/await | 异步 I/O | 单线程,高并发 |
| parallel/spawn | 并行计算 | 计算密集型 |
| Lock | 互斥锁 | 保护共享数据 |
| Atomic | 原子操作 | 无锁同步 |
练习
- 使用 Channel 实现生产者-消费者模式
- 用 async/await 实现并发 HTTP 请求
- 实现一个线程安全的缓存
扩展阅读
← 上一章:文件与 I/O | 下一章:外部函数接口 →