强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第16章:并发编程

第16章:并发编程

16.1 创建线程

基本用法

use std::thread;
use std::time::Duration;

fn main() {
    // 创建新线程
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("子线程: {}", i);
            thread::sleep(Duration::from_millis(100));
        }
    });

    // 主线程继续执行
    for i in 1..=3 {
        println!("主线程: {}", i);
        thread::sleep(Duration::from_millis(150));
    }

    // 等待子线程完成
    handle.join().unwrap();
    println!("所有线程完成");
}

传递数据到线程

use std::thread;

fn main() {
    let data = vec![1, 2, 3, 4, 5];

    // move 闭包将所有权移动到线程
    let handle = thread::spawn(move || {
        println!("子线程收到: {:?}", data);
        let sum: i32 = data.iter().sum();
        sum // 返回值
    });

    // join 返回线程的返回值
    let result = handle.join().unwrap();
    println!("求和结果: {}", result);
}

线程数量和 ID

use std::thread;

fn main() {
    println!("可用CPU核心数: {}", thread::available_parallelism().unwrap().get());

    let handle = thread::spawn(|| {
        println!("线程ID: {:?}", thread::current().id());
    });

    handle.join().unwrap();
}

16.2 共享状态:Mutex

基本用法

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        // lock() 获取锁,返回 MutexGuard
        let mut num = m.lock().unwrap();
        *num = 6;
        println!("锁内的值: {}", num);
    } // MutexGuard 被 drop,自动释放锁

    println!("锁外的值: {:?}", m);
}

多线程共享:Arc<Mutex>

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("最终计数: {}", *counter.lock().unwrap()); // 10
}

16.3 RwLock

读写锁:允许多个读取者或一个写入者。

use std::sync::{Arc, RwLock};
use std::thread;

fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));
    let mut handles = vec![];

    // 多个读取线程
    for i in 0..3 {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let read = data.read().unwrap();
            println!("读取者 {}: {:?}", i, *read);
        }));
    }

    // 一个写入线程
    {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let mut write = data.write().unwrap();
            write.push(4);
            println!("写入者完成: {:?}", *write);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Mutex vs RwLock

特性MutexRwLock
读取独占多个并发读
写入独占独占
性能(读多写少)一般更好
性能(写多)更好一般
死锁风险较低较高

16.4 Channel 通信

多生产者,单消费者(mpsc)

use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建通道
    let (tx, rx) = mpsc::channel();

    // 生产者线程
    thread::spawn(move || {
        let messages = vec!["你好", "来自", "子线程"];
        for msg in messages {
            tx.send(msg.to_string()).unwrap();
            thread::sleep(std::time::Duration::from_millis(200));
        }
    });

    // 消费者(主线程)
    for received in rx {
        println!("收到: {}", received);
    }
}

多生产者

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 克隆发送端给多个线程
    for i in 0..3 {
        let tx = tx.clone();
        thread::spawn(move || {
            for j in 0..3 {
                let msg = format!("线程{}-消息{}", i, j);
                tx.send(msg).unwrap();
                thread::sleep(Duration::from_millis(100));
            }
        });
    }

    // 丢弃原始发送端
    drop(tx);

    // 接收所有消息
    for received in rx {
        println!("收到: {}", received);
    }
}

同步通道

use std::sync::mpsc;
use std::thread;

fn main() {
    // sync_channel 有缓冲区大小限制
    let (tx, rx) = mpsc::sync_channel(2);

    thread::spawn(move || {
        for i in 0..5 {
            println!("发送 {}...", i);
            tx.send(i).unwrap();
            println!("发送 {} 完成", i);
        }
    });

    for received in rx {
        println!("收到: {}", received);
        thread::sleep(std::time::Duration::from_millis(500));
    }
}

16.5 Send 和 Sync Trait

Trait含义说明
Send可以在线程间转移所有权几乎所有类型都实现了
Sync可以在线程间共享引用&TSendTSync
// Send: 可以安全地发送到另一个线程
// - i32, String, Vec<T>, Arc<T>, Mutex<T> 都是 Send
// - Rc<T>, *const T 不是 Send

// Sync: 可以安全地在多个线程中引用
// - i32, String, Arc<T>, Mutex<T> 都是 Sync
// - Rc<T>, RefCell<T>, Cell<T> 不是 Sync

use std::sync::Arc;
use std::thread;

fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}

fn main() {
    // 验证某些类型是 Send + Sync
    assert_send::<i32>();
    assert_send::<String>();
    assert_send::<Arc<i32>>();
    assert_sync::<i32>();
    assert_sync::<String>();
    assert_sync::<Arc<i32>>();

    // 以下类型不是线程安全的:
    // assert_send::<std::rc::Rc<i32>>(); // ❌ Rc 不是 Send
    // assert_sync::<std::cell::RefCell<i32>>(); // ❌ RefCell 不是 Sync

    println!("类型检查通过");
}

16.6 原子类型

use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let counter = Arc::new(AtomicU64::new(0));
    let flag = Arc::new(AtomicBool::new(false));

    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let flag = Arc::clone(&flag);
        handles.push(thread::spawn(move || {
            for _ in 0..1000 {
                counter.fetch_add(1, Ordering::Relaxed);
            }
            flag.store(true, Ordering::Release);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("计数器: {}", counter.load(Ordering::Relaxed));
    println!("标志位: {}", flag.load(Ordering::Acquire));
}

Ordering 级别

Ordering说明性能
Relaxed不保证顺序最快
Release之前的写入对其他线程可见中等
Acquire看到其他线程 Release 的写入中等
AcqRelAcquire + Release较慢
SeqCst全局顺序一致最慢(默认推荐)

16.7 Rayon 并行库

# Cargo.toml
[dependencies]
rayon = "1.10"
use rayon::prelude::*;

fn main() {
    let numbers: Vec<i64> = (1..=1_000_000).collect();

    // 串行
    let sum_serial: i64 = numbers.iter().sum();

    // 并行(只需改为 par_iter())
    let sum_parallel: i64 = numbers.par_iter().sum();

    println!("串行: {}", sum_serial);
    println!("并行: {}", sum_parallel);
    assert_eq!(sum_serial, sum_parallel);

    // 并行排序
    let mut data: Vec<i32> = (0..1000).rev().collect();
    data.par_sort();
    println!("排序后前10个: {:?}", &data[..10]);
}

16.8 业务场景示例

生产者-消费者模式

use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;

struct Task {
    id: u32,
    data: String,
}

fn worker(id: usize, rx: Arc<Mutex<mpsc::Receiver<Task>>>) {
    loop {
        let task = {
            let receiver = rx.lock().unwrap();
            receiver.recv()
        };

        match task {
            Ok(task) => {
                println!("Worker {} 处理任务 {}: {}", id, task.id, task.data);
                thread::sleep(Duration::from_millis(100));
            }
            Err(_) => {
                println!("Worker {} 退出", id);
                break;
            }
        }
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let rx = Arc::new(Mutex::new(rx));

    // 创建工作线程
    let mut handles = vec![];
    for i in 0..4 {
        let rx = Arc::clone(&rx);
        handles.push(thread::spawn(move || worker(i, rx)));
    }

    // 发送任务
    for i in 0..10 {
        tx.send(Task {
            id: i,
            data: format!("任务数据 {}", i),
        }).unwrap();
    }

    // 关闭发送端
    drop(tx);

    // 等待所有工作线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    println!("所有任务完成");
}

16.9 本章小结

要点说明
thread::spawn创建新线程
Arc<Mutex>多线程共享可变状态
RwLock读写锁,适合读多写少
mpsc::channel消息传递通道
Send/Sync线程安全 trait
原子类型无锁并发操作
Rayon轻松实现数据并行

扩展阅读

  1. Rust Book - 并发 — 官方教程
  2. Rust Atomics and Locks — 深入并发
  3. Rayon 文档 — 并行计算库