第 19 章:并发编程
第 19 章:并发编程
“并发不是并行,并发是结构,并行是执行。” —— Rob Pike
19.1 并发概述
19.1.1 Ruby 的并发模型
| 机制 | 说明 | 适用场景 |
|---|---|---|
| Thread | 纸巾级线程 | I/O 密集型任务 |
| Fiber | 协程式并发 | 轻量级任务切换 |
| Ractor | Actor 模型(Ruby 3.0+) | CPU 密集型并行 |
| Async | 异步编程(gem) | 高并发 I/O |
19.1.2 GVL(Global VM Lock)
# Ruby MRI 有全局解释器锁(GVL/GIL)
# 这意味着:
# - 多线程不能同时执行 Ruby 代码
# - 但 I/O 操作会释放 GVL
# - 适合 I/O 密集型,不适合 CPU 密集型
# 证明 GVL 的存在
t1 = Thread.new { 100_000_000.times { 1 + 1 } }
t2 = Thread.new { 100_000_000.times { 1 + 1 } }
t1.join
t2.join
# 两个线程的总时间 ≈ 串行执行的时间
# I/O 操作释放 GVL
t1 = Thread.new { Net::HTTP.get(URI("https://example.com")) }
t2 = Thread.new { Net::HTTP.get(URI("https://example.com")) }
# 两个请求几乎同时完成
19.2 线程(Thread)
19.2.1 基础用法
# 创建线程
thread = Thread.new do
5.times do |i|
puts "Thread: #{i}"
sleep 0.1
end
end
# 主线程继续执行
5.times do |i|
puts "Main: #{i}"
sleep 0.1
end
# 等待线程完成
thread.join
# 带参数的线程
thread = Thread.new(1, 2) do |a, b|
a + b
end
puts thread.value # => 3(join 并获取返回值)
19.2.2 线程同步
# 互斥锁
mutex = Mutex.new
counter = 0
threads = 10.times.map do
Thread.new do
1000.times do
mutex.synchronize do
counter += 1
end
end
end
end
threads.each(&:join)
puts counter # => 10000(正确,没有竞争条件)
# 条件变量
mutex = Mutex.new
resource = ConditionVariable.new
data = nil
producer = Thread.new do
mutex.synchronize do
data = "Hello from producer"
resource.signal # 通知消费者
end
end
consumer = Thread.new do
mutex.synchronize do
resource.wait(mutex) while data.nil?
puts "Received: #{data}"
end
end
[producer, consumer].each(&:join)
19.2.3 线程池
class ThreadPool
def initialize(size)
@size = size
@queue = Queue.new
@threads = []
@size.times do
@threads << Thread.new do
while (task = @queue.pop)
task.call
end
end
end
end
def schedule(&block)
@queue << block
end
def shutdown
@size.times { @queue << nil }
@threads.each(&:join)
end
end
# 使用
pool = ThreadPool.new(4)
10.times do |i|
pool.schedule do
puts "Task #{i} on thread #{Thread.current.object_id}"
sleep(rand(0.1..0.5))
end
end
pool.shutdown
19.3 Fiber
19.3.1 基础 Fiber
# Fiber 是轻量级协程
fiber = Fiber.new do
puts "Step 1"
Fiber.yield # 暂停,返回值
puts "Step 2"
Fiber.yield
puts "Step 3"
"Done"
end
puts fiber.resume # 执行到第一个 yield,返回 nil
puts fiber.resume # 执行到第二个 yield
puts fiber.resume # 执行到最后,返回 "Done"
# 带值的 Fiber
fiber = Fiber.new do |input|
result = input * 2
received = Fiber.yield(result) # 暂停,返回 result,接收 received
received + 10
end
puts fiber.resume(5) # => 10(输入 5,yield 返回 10)
puts fiber.resume(100) # => 110(输入 100,返回 110)
19.3.2 Fiber 调度器(Ruby 3.0+)
# Ruby 3.0+ 引入 Fiber 调度器
require "fiber"
# 自定义调度器
class SimpleScheduler
def initialize
@fibers = []
@waiting = {}
end
def fiber(&block)
fiber = Fiber.new(blocking: false) do
block.call
end
@fibers << fiber
fiber
end
def run
until @fibers.empty?
fiber = @fibers.shift
fiber.resume
end
end
end
19.3.3 Enumerator 和 Fiber
# Ruby 的 Enumerator 内部使用 Fiber
fibonacci = Enumerator.new do |yielder|
a, b = 0, 1
loop do
yielder << a
a, b = b, a + b
end
end
puts fibonacci.take(10)
# => [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
# 手动控制迭代
enum = [1, 2, 3].each
puts enum.next # => 1
puts enum.next # => 2
puts enum.next # => 3
19.4 Ractor(Ruby 3.0+)
19.4.1 Ractor 基础
# Ractor 是真正的并行执行
# 每个 Ractor 有自己的 GVL
# 基本 Ractor
ractor = Ractor.new do
"Hello from Ractor"
end
puts ractor.take # => "Hello from Ractor"
# 传递参数
ractor = Ractor.new(1, 2) do |a, b|
a + b
end
puts ractor.take # => 3
19.4.2 Ractor 通信
# 使用 send/take 通信
ractor = Ractor.new do
loop do
msg = Ractor.receive
puts "Received: #{msg}"
Ractor.yield(msg.upcase)
end
end
ractor.send("hello")
puts ractor.take # => "HELLO"
ractor.send("world")
puts ractor.take # => "WORLD"
19.4.3 Ractor 并行计算
# 并行计算(CPU 密集型任务)
def fibonacci(n)
return n if n <= 1
fibonacci(n - 1) + fibonacci(n - 2)
end
# 串行执行
start = Time.now
results = [35, 36, 37, 38].map { |n| fibonacci(n) }
puts "Serial: #{Time.now - start}s"
# Ractor 并行执行
start = Time.now
ractors = [35, 36, 37, 38].map do |n|
Ractor.new(n) { |num| fibonacci(num) }
end
results = ractors.map(&:take)
puts "Parallel: #{Time.now - start}s"
19.4.4 Ractor 限制
# Ractor 之间不能共享可变对象
# 必须通过消息传递通信
# ❌ 错误:共享可变状态
shared_array = []
ractor = Ractor.new(shared_array) do |arr|
arr << 1 # 可能出错
end
# ✅ 正确:通过消息传递
ractor = Ractor.new do
msg = Ractor.receive
msg + 1
end
ractor.send(10)
puts ractor.take # => 11
19.5 异步编程
19.5.1 async gem
# Gemfile
gem "async"
gem "async-http"
require "async"
require "async/http/internet"
# 异步任务
Async do |task|
internet = Async::HTTP::Internet.new
# 并行发起多个请求
task.async do
response = internet.get("https://httpbin.org/delay/1")
puts "Response 1: #{response.status}"
end
task.async do
response = internet.get("https://httpbin.org/delay/1")
puts "Response 2: #{response.status}"
end
# 两个请求几乎同时完成
end
19.5.2 Concurrent Ruby
# Gemfile
gem "concurrent-ruby"
require "concurrent"
# 线程池
pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 2,
max_threads: 10,
max_queue: 100
)
futures = 5.times.map do |i|
Concurrent::Promise.execute(executor: pool) do
sleep(rand(0.1..0.5))
"Result #{i}"
end
end
results = futures.map(&:value!)
puts results.inspect
# Future
future = Concurrent::Future.execute do
sleep 1
42
end
puts "Doing other work..."
puts "Result: #{future.value}" # 等待结果
19.6 实际业务场景
19.6.1 并行 HTTP 请求
require "net/http"
require "uri"
def fetch_url(url)
uri = URI(url)
response = Net::HTTP.get_response(uri)
{ url: url, status: response.code, body: response.body[0..100] }
end
urls = [
"https://example.com",
"https://ruby-lang.org",
"https://github.com"
]
# 串行
start = Time.now
results = urls.map { |url| fetch_url(url) }
puts "Serial: #{Time.now - start}s"
# 并行(使用线程)
start = Time.now
threads = urls.map { |url| Thread.new { fetch_url(url) } }
results = threads.map(&:value)
puts "Parallel: #{Time.now - start}s"
19.6.2 生产者-消费者模式
require "thread"
queue = Queue.new
buffer_size = 10
# 生产者
producer = Thread.new do
20.times do |i|
queue << "item_#{i}"
puts "Produced: item_#{i} (size: #{queue.size})"
sleep(rand(0.1..0.3))
end
queue << :done
end
# 多个消费者
consumers = 3.times.map do |id|
Thread.new do
loop do
item = queue.pop
break if item == :done
puts "Consumer #{id} processing: #{item}"
sleep(rand(0.1..0.5))
end
end
end
producer.join
consumers.each(&:join)
puts "All done!"
19.7 动手练习
- 并行文件处理
# 使用多线程并行处理目录中的文件
def parallel_process(dir, &block)
# 你的代码...
end
- 实现限流器
class RateLimiter
# 限制每秒请求数
end
- 实现 Ractor Map-Reduce
# 使用 Ractor 实现 map-reduce
def ractor_map_reduce(data, map_fn, reduce_fn)
# 你的代码...
end
19.8 本章小结
| 要点 | 说明 |
|---|---|
| GVL | 全局解释器锁限制真并行 |
| Thread | 纸巾级线程,适合 I/O 并发 |
| Fiber | 协程式并发,轻量级 |
| Ractor | Actor 模型,真正的并行(Ruby 3.0+) |
| async | 异步编程 gem |
| 同步 | Mutex、ConditionVariable 保护共享状态 |
📖 扩展阅读
上一章:← 第 18 章:Sinatra 轻量 Web 下一章:第 20 章:性能优化 →