强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 19 章:并发编程

第 19 章:并发编程

“并发不是并行,并发是结构,并行是执行。” —— Rob Pike


19.1 并发概述

19.1.1 Ruby 的并发模型

机制说明适用场景
Thread纸巾级线程I/O 密集型任务
Fiber协程式并发轻量级任务切换
RactorActor 模型(Ruby 3.0+)CPU 密集型并行
Async异步编程(gem)高并发 I/O

19.1.2 GVL(Global VM Lock)

# Ruby MRI 有全局解释器锁(GVL/GIL)
# 这意味着:
# - 多线程不能同时执行 Ruby 代码
# - 但 I/O 操作会释放 GVL
# - 适合 I/O 密集型,不适合 CPU 密集型

# 证明 GVL 的存在
t1 = Thread.new { 100_000_000.times { 1 + 1 } }
t2 = Thread.new { 100_000_000.times { 1 + 1 } }
t1.join
t2.join
# 两个线程的总时间 ≈ 串行执行的时间

# I/O 操作释放 GVL
t1 = Thread.new { Net::HTTP.get(URI("https://example.com")) }
t2 = Thread.new { Net::HTTP.get(URI("https://example.com")) }
# 两个请求几乎同时完成

19.2 线程(Thread)

19.2.1 基础用法

# 创建线程
thread = Thread.new do
  5.times do |i|
    puts "Thread: #{i}"
    sleep 0.1
  end
end

# 主线程继续执行
5.times do |i|
  puts "Main: #{i}"
  sleep 0.1
end

# 等待线程完成
thread.join

# 带参数的线程
thread = Thread.new(1, 2) do |a, b|
  a + b
end
puts thread.value  # => 3(join 并获取返回值)

19.2.2 线程同步

# 互斥锁
mutex = Mutex.new
counter = 0

threads = 10.times.map do
  Thread.new do
    1000.times do
      mutex.synchronize do
        counter += 1
      end
    end
  end
end

threads.each(&:join)
puts counter  # => 10000(正确,没有竞争条件)

# 条件变量
mutex = Mutex.new
resource = ConditionVariable.new
data = nil

producer = Thread.new do
  mutex.synchronize do
    data = "Hello from producer"
    resource.signal  # 通知消费者
  end
end

consumer = Thread.new do
  mutex.synchronize do
    resource.wait(mutex) while data.nil?
    puts "Received: #{data}"
  end
end

[producer, consumer].each(&:join)

19.2.3 线程池

class ThreadPool
  def initialize(size)
    @size = size
    @queue = Queue.new
    @threads = []
    
    @size.times do
      @threads << Thread.new do
        while (task = @queue.pop)
          task.call
        end
      end
    end
  end

  def schedule(&block)
    @queue << block
  end

  def shutdown
    @size.times { @queue << nil }
    @threads.each(&:join)
  end
end

# 使用
pool = ThreadPool.new(4)

10.times do |i|
  pool.schedule do
    puts "Task #{i} on thread #{Thread.current.object_id}"
    sleep(rand(0.1..0.5))
  end
end

pool.shutdown

19.3 Fiber

19.3.1 基础 Fiber

# Fiber 是轻量级协程
fiber = Fiber.new do
  puts "Step 1"
  Fiber.yield  # 暂停,返回值
  
  puts "Step 2"
  Fiber.yield
  
  puts "Step 3"
  "Done"
end

puts fiber.resume  # 执行到第一个 yield,返回 nil
puts fiber.resume  # 执行到第二个 yield
puts fiber.resume  # 执行到最后,返回 "Done"

# 带值的 Fiber
fiber = Fiber.new do |input|
  result = input * 2
  received = Fiber.yield(result)  # 暂停,返回 result,接收 received
  received + 10
end

puts fiber.resume(5)     # => 10(输入 5,yield 返回 10)
puts fiber.resume(100)   # => 110(输入 100,返回 110)

19.3.2 Fiber 调度器(Ruby 3.0+)

# Ruby 3.0+ 引入 Fiber 调度器
require "fiber"

# 自定义调度器
class SimpleScheduler
  def initialize
    @fibers = []
    @waiting = {}
  end

  def fiber(&block)
    fiber = Fiber.new(blocking: false) do
      block.call
    end
    @fibers << fiber
    fiber
  end

  def run
    until @fibers.empty?
      fiber = @fibers.shift
      fiber.resume
    end
  end
end

19.3.3 Enumerator 和 Fiber

# Ruby 的 Enumerator 内部使用 Fiber
fibonacci = Enumerator.new do |yielder|
  a, b = 0, 1
  loop do
    yielder << a
    a, b = b, a + b
  end
end

puts fibonacci.take(10)
# => [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

# 手动控制迭代
enum = [1, 2, 3].each
puts enum.next  # => 1
puts enum.next  # => 2
puts enum.next  # => 3

19.4 Ractor(Ruby 3.0+)

19.4.1 Ractor 基础

# Ractor 是真正的并行执行
# 每个 Ractor 有自己的 GVL

# 基本 Ractor
ractor = Ractor.new do
  "Hello from Ractor"
end

puts ractor.take  # => "Hello from Ractor"

# 传递参数
ractor = Ractor.new(1, 2) do |a, b|
  a + b
end

puts ractor.take  # => 3

19.4.2 Ractor 通信

# 使用 send/take 通信
ractor = Ractor.new do
  loop do
    msg = Ractor.receive
    puts "Received: #{msg}"
    Ractor.yield(msg.upcase)
  end
end

ractor.send("hello")
puts ractor.take  # => "HELLO"

ractor.send("world")
puts ractor.take  # => "WORLD"

19.4.3 Ractor 并行计算

# 并行计算(CPU 密集型任务)
def fibonacci(n)
  return n if n <= 1
  fibonacci(n - 1) + fibonacci(n - 2)
end

# 串行执行
start = Time.now
results = [35, 36, 37, 38].map { |n| fibonacci(n) }
puts "Serial: #{Time.now - start}s"

# Ractor 并行执行
start = Time.now
ractors = [35, 36, 37, 38].map do |n|
  Ractor.new(n) { |num| fibonacci(num) }
end
results = ractors.map(&:take)
puts "Parallel: #{Time.now - start}s"

19.4.4 Ractor 限制

# Ractor 之间不能共享可变对象
# 必须通过消息传递通信

# ❌ 错误:共享可变状态
shared_array = []
ractor = Ractor.new(shared_array) do |arr|
  arr << 1  # 可能出错
end

# ✅ 正确:通过消息传递
ractor = Ractor.new do
  msg = Ractor.receive
  msg + 1
end

ractor.send(10)
puts ractor.take  # => 11

19.5 异步编程

19.5.1 async gem

# Gemfile
gem "async"
gem "async-http"

require "async"
require "async/http/internet"

# 异步任务
Async do |task|
  internet = Async::HTTP::Internet.new
  
  # 并行发起多个请求
  task.async do
    response = internet.get("https://httpbin.org/delay/1")
    puts "Response 1: #{response.status}"
  end
  
  task.async do
    response = internet.get("https://httpbin.org/delay/1")
    puts "Response 2: #{response.status}"
  end
  
  # 两个请求几乎同时完成
end

19.5.2 Concurrent Ruby

# Gemfile
gem "concurrent-ruby"

require "concurrent"

# 线程池
pool = Concurrent::ThreadPoolExecutor.new(
  min_threads: 2,
  max_threads: 10,
  max_queue: 100
)

futures = 5.times.map do |i|
  Concurrent::Promise.execute(executor: pool) do
    sleep(rand(0.1..0.5))
    "Result #{i}"
  end
end

results = futures.map(&:value!)
puts results.inspect

# Future
future = Concurrent::Future.execute do
  sleep 1
  42
end

puts "Doing other work..."
puts "Result: #{future.value}"  # 等待结果

19.6 实际业务场景

19.6.1 并行 HTTP 请求

require "net/http"
require "uri"

def fetch_url(url)
  uri = URI(url)
  response = Net::HTTP.get_response(uri)
  { url: url, status: response.code, body: response.body[0..100] }
end

urls = [
  "https://example.com",
  "https://ruby-lang.org",
  "https://github.com"
]

# 串行
start = Time.now
results = urls.map { |url| fetch_url(url) }
puts "Serial: #{Time.now - start}s"

# 并行(使用线程)
start = Time.now
threads = urls.map { |url| Thread.new { fetch_url(url) } }
results = threads.map(&:value)
puts "Parallel: #{Time.now - start}s"

19.6.2 生产者-消费者模式

require "thread"

queue = Queue.new
buffer_size = 10

# 生产者
producer = Thread.new do
  20.times do |i|
    queue << "item_#{i}"
    puts "Produced: item_#{i} (size: #{queue.size})"
    sleep(rand(0.1..0.3))
  end
  queue << :done
end

# 多个消费者
consumers = 3.times.map do |id|
  Thread.new do
    loop do
      item = queue.pop
      break if item == :done
      puts "Consumer #{id} processing: #{item}"
      sleep(rand(0.1..0.5))
    end
  end
end

producer.join
consumers.each(&:join)
puts "All done!"

19.7 动手练习

  1. 并行文件处理
# 使用多线程并行处理目录中的文件
def parallel_process(dir, &block)
  # 你的代码...
end
  1. 实现限流器
class RateLimiter
  # 限制每秒请求数
end
  1. 实现 Ractor Map-Reduce
# 使用 Ractor 实现 map-reduce
def ractor_map_reduce(data, map_fn, reduce_fn)
  # 你的代码...
end

19.8 本章小结

要点说明
GVL全局解释器锁限制真并行
Thread纸巾级线程,适合 I/O 并发
Fiber协程式并发,轻量级
RactorActor 模型,真正的并行(Ruby 3.0+)
async异步编程 gem
同步Mutex、ConditionVariable 保护共享状态

📖 扩展阅读


上一章← 第 18 章:Sinatra 轻量 Web 下一章第 20 章:性能优化 →