第16章:并发编程
第16章:并发编程
16.1 创建线程
基本用法
use std::thread;
use std::time::Duration;
fn main() {
// 创建新线程
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("子线程: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
// 主线程继续执行
for i in 1..=3 {
println!("主线程: {}", i);
thread::sleep(Duration::from_millis(150));
}
// 等待子线程完成
handle.join().unwrap();
println!("所有线程完成");
}
传递数据到线程
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5];
// move 闭包将所有权移动到线程
let handle = thread::spawn(move || {
println!("子线程收到: {:?}", data);
let sum: i32 = data.iter().sum();
sum // 返回值
});
// join 返回线程的返回值
let result = handle.join().unwrap();
println!("求和结果: {}", result);
}
线程数量和 ID
use std::thread;
fn main() {
println!("可用CPU核心数: {}", thread::available_parallelism().unwrap().get());
let handle = thread::spawn(|| {
println!("线程ID: {:?}", thread::current().id());
});
handle.join().unwrap();
}
16.2 共享状态:Mutex
基本用法
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
// lock() 获取锁,返回 MutexGuard
let mut num = m.lock().unwrap();
*num = 6;
println!("锁内的值: {}", num);
} // MutexGuard 被 drop,自动释放锁
println!("锁外的值: {:?}", m);
}
多线程共享:Arc<Mutex>
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终计数: {}", *counter.lock().unwrap()); // 10
}
16.3 RwLock
读写锁:允许多个读取者或一个写入者。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// 多个读取线程
for i in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let read = data.read().unwrap();
println!("读取者 {}: {:?}", i, *read);
}));
}
// 一个写入线程
{
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut write = data.write().unwrap();
write.push(4);
println!("写入者完成: {:?}", *write);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
Mutex vs RwLock
| 特性 | Mutex | RwLock |
|---|
| 读取 | 独占 | 多个并发读 |
| 写入 | 独占 | 独占 |
| 性能(读多写少) | 一般 | 更好 |
| 性能(写多) | 更好 | 一般 |
| 死锁风险 | 较低 | 较高 |
16.4 Channel 通信
多生产者,单消费者(mpsc)
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建通道
let (tx, rx) = mpsc::channel();
// 生产者线程
thread::spawn(move || {
let messages = vec!["你好", "来自", "子线程"];
for msg in messages {
tx.send(msg.to_string()).unwrap();
thread::sleep(std::time::Duration::from_millis(200));
}
});
// 消费者(主线程)
for received in rx {
println!("收到: {}", received);
}
}
多生产者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 克隆发送端给多个线程
for i in 0..3 {
let tx = tx.clone();
thread::spawn(move || {
for j in 0..3 {
let msg = format!("线程{}-消息{}", i, j);
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
}
// 丢弃原始发送端
drop(tx);
// 接收所有消息
for received in rx {
println!("收到: {}", received);
}
}
同步通道
use std::sync::mpsc;
use std::thread;
fn main() {
// sync_channel 有缓冲区大小限制
let (tx, rx) = mpsc::sync_channel(2);
thread::spawn(move || {
for i in 0..5 {
println!("发送 {}...", i);
tx.send(i).unwrap();
println!("发送 {} 完成", i);
}
});
for received in rx {
println!("收到: {}", received);
thread::sleep(std::time::Duration::from_millis(500));
}
}
16.5 Send 和 Sync Trait
| Trait | 含义 | 说明 |
|---|
Send | 可以在线程间转移所有权 | 几乎所有类型都实现了 |
Sync | 可以在线程间共享引用 | &T 是 Send 则 T 是 Sync |
// Send: 可以安全地发送到另一个线程
// - i32, String, Vec<T>, Arc<T>, Mutex<T> 都是 Send
// - Rc<T>, *const T 不是 Send
// Sync: 可以安全地在多个线程中引用
// - i32, String, Arc<T>, Mutex<T> 都是 Sync
// - Rc<T>, RefCell<T>, Cell<T> 不是 Sync
use std::sync::Arc;
use std::thread;
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
fn main() {
// 验证某些类型是 Send + Sync
assert_send::<i32>();
assert_send::<String>();
assert_send::<Arc<i32>>();
assert_sync::<i32>();
assert_sync::<String>();
assert_sync::<Arc<i32>>();
// 以下类型不是线程安全的:
// assert_send::<std::rc::Rc<i32>>(); // ❌ Rc 不是 Send
// assert_sync::<std::cell::RefCell<i32>>(); // ❌ RefCell 不是 Sync
println!("类型检查通过");
}
16.6 原子类型
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicU64::new(0));
let flag = Arc::new(AtomicBool::new(false));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let flag = Arc::clone(&flag);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
counter.fetch_add(1, Ordering::Relaxed);
}
flag.store(true, Ordering::Release);
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("计数器: {}", counter.load(Ordering::Relaxed));
println!("标志位: {}", flag.load(Ordering::Acquire));
}
Ordering 级别
| Ordering | 说明 | 性能 |
|---|
Relaxed | 不保证顺序 | 最快 |
Release | 之前的写入对其他线程可见 | 中等 |
Acquire | 看到其他线程 Release 的写入 | 中等 |
AcqRel | Acquire + Release | 较慢 |
SeqCst | 全局顺序一致 | 最慢(默认推荐) |
16.7 Rayon 并行库
# Cargo.toml
[dependencies]
rayon = "1.10"
use rayon::prelude::*;
fn main() {
let numbers: Vec<i64> = (1..=1_000_000).collect();
// 串行
let sum_serial: i64 = numbers.iter().sum();
// 并行(只需改为 par_iter())
let sum_parallel: i64 = numbers.par_iter().sum();
println!("串行: {}", sum_serial);
println!("并行: {}", sum_parallel);
assert_eq!(sum_serial, sum_parallel);
// 并行排序
let mut data: Vec<i32> = (0..1000).rev().collect();
data.par_sort();
println!("排序后前10个: {:?}", &data[..10]);
}
16.8 业务场景示例
生产者-消费者模式
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;
struct Task {
id: u32,
data: String,
}
fn worker(id: usize, rx: Arc<Mutex<mpsc::Receiver<Task>>>) {
loop {
let task = {
let receiver = rx.lock().unwrap();
receiver.recv()
};
match task {
Ok(task) => {
println!("Worker {} 处理任务 {}: {}", id, task.id, task.data);
thread::sleep(Duration::from_millis(100));
}
Err(_) => {
println!("Worker {} 退出", id);
break;
}
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
let rx = Arc::new(Mutex::new(rx));
// 创建工作线程
let mut handles = vec![];
for i in 0..4 {
let rx = Arc::clone(&rx);
handles.push(thread::spawn(move || worker(i, rx)));
}
// 发送任务
for i in 0..10 {
tx.send(Task {
id: i,
data: format!("任务数据 {}", i),
}).unwrap();
}
// 关闭发送端
drop(tx);
// 等待所有工作线程完成
for handle in handles {
handle.join().unwrap();
}
println!("所有任务完成");
}
16.9 本章小结
| 要点 | 说明 |
|---|
| thread::spawn | 创建新线程 |
| Arc<Mutex> | 多线程共享可变状态 |
| RwLock | 读写锁,适合读多写少 |
| mpsc::channel | 消息传递通道 |
| Send/Sync | 线程安全 trait |
| 原子类型 | 无锁并发操作 |
| Rayon | 轻松实现数据并行 |
扩展阅读
- Rust Book - 并发 — 官方教程
- Rust Atomics and Locks — 深入并发
- Rayon 文档 — 并行计算库