强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第17章:异步编程

第17章:异步编程

17.1 同步 vs 异步

特性同步(线程)异步(async/await)
切换开销较大(OS 线程切换)极小(用户态切换)
内存占用每线程 ~2MB 栈每任务 ~几KB
并发数数千(受限于 OS)数百万(受限于内存)
适用场景CPU 密集型I/O 密集型
编程模型阻塞非阻塞

17.2 基本 async/await

选择运行时

Rust 标准库不包含异步运行时,需要使用第三方库:

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }

第一个异步程序

use tokio::time::{sleep, Duration};

async fn fetch_data(id: u32) -> String {
    sleep(Duration::from_millis(100)).await;
    format!("数据#{}", id)
}

async fn process() {
    let data = fetch_data(1).await;
    println!("获取到: {}", data);
}

#[tokio::main]
async fn main() {
    process().await;
}

17.3 并发执行

tokio::join!

use tokio::time::{sleep, Duration};

async fn task_a() -> &'static str {
    sleep(Duration::from_millis(100)).await;
    "任务A完成"
}

async fn task_b() -> &'static str {
    sleep(Duration::from_millis(150)).await;
    "任务B完成"
}

#[tokio::main]
async fn main() {
    // 并发执行多个任务
    let (a, b) = tokio::join!(task_a(), task_b());
    println!("{}, {}", a, b);
}

tokio::spawn

use tokio::time::{sleep, Duration};

async fn background_task(id: u32) -> String {
    sleep(Duration::from_millis(50 * id as u64)).await;
    format!("任务{}完成", id)
}

#[tokio::main]
async fn main() {
    let mut handles = vec![];

    for i in 1..=5 {
        let handle = tokio::spawn(background_task(i));
        handles.push(handle);
    }

    for handle in handles {
        let result = handle.await.unwrap();
        println!("{}", result);
    }
}

17.4 select! 宏

use tokio::time::{sleep, Duration};
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        sleep(Duration::from_millis(50)).await;
        let _ = tx.send("消息");
    });

    tokio::select! {
        msg = rx => {
            println!("收到: {}", msg.unwrap());
        }
        _ = sleep(Duration::from_millis(100)) => {
            println!("超时");
        }
    }
}

17.5 异步通道

mpsc 通道

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(format!("消息{}", i)).await.unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        }
    });

    while let Some(msg) = rx.recv().await {
        println!("收到: {}", msg);
    }
}

broadcast 通道

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel(16);

    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("接收者1: {}", msg);
        }
    });

    tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            println!("接收者2: {}", msg);
        }
    });

    for i in 0..3 {
        tx.send(format!("广播{}", i)).unwrap();
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    }

    tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}

17.6 Future Trait

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct CountDown {
    count: u32,
}

impl Future for CountDown {
    type Output = String;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.count == 0 {
            Poll::Ready("倒计时完成!".to_string())
        } else {
            println!("倒计时: {}", self.count);
            self.count -= 1;
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let countdown = CountDown { count: 5 };
    let result = countdown.await;
    println!("{}", result);
}

17.7 Pin 与 Unpin

use std::pin::Pin;
use std::future::Future;

// Pin 确保值不会在内存中移动
// 对于自引用结构体很重要

async fn example() {
    let data = vec![1, 2, 3];
    let reference = &data; // 自引用
    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    println!("{:?}", reference);
}

fn main() {
    // 大多数情况下不需要直接操作 Pin
    // 编译器和 async/await 语法会自动处理

    let fut = example();
    // tokio::pin!(fut); // 将 future 固定在栈上

    println!("Pin 大小: {} 字节", std::mem::size_of::<Pin<Box<dyn Future<Output = ()>>>>());
}

17.8 Stream

use tokio_stream::StreamExt;
use tokio::time::{sleep, Duration};

async fn number_stream() -> impl tokio_stream::Stream<Item = i32> {
    tokio_stream::iter(1..=10)
}

#[tokio::main]
async fn main() {
    let mut stream = number_stream().await;

    while let Some(value) = stream.next().await {
        print!("{} ", value);
    }
    println!();

    // 异步迭代器适配器
    let sum: i32 = tokio_stream::iter(1..=100)
        .filter(|x| x % 2 == 0)
        .fold(0, |acc, x| acc + x)
        .await;

    println!("偶数之和: {}", sum);
}

17.9 异步同步原语

use tokio::sync::{Mutex, RwLock, Semaphore};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // 异步 Mutex
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(tokio::spawn(async move {
            let mut num = counter.lock().await;
            *num += 1;
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
    println!("异步Mutex计数: {}", counter.lock().await);

    // 信号量
    let semaphore = Arc::new(Semaphore::new(3)); // 最多3个并发
    let mut handles = vec![];

    for i in 0..10 {
        let sem = Arc::clone(&semaphore);
        handles.push(tokio::spawn(async move {
            let _permit = sem.acquire().await.unwrap();
            println!("任务 {} 开始", i);
            sleep(Duration::from_millis(100)).await;
            println!("任务 {} 结束", i);
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

17.10 业务场景示例

并发 HTTP 客户端

use tokio::time::{sleep, Duration};

async fn fetch_url(url: &str) -> Result<String, String> {
    // 模拟 HTTP 请求
    sleep(Duration::from_millis(100)).await;
    Ok(format!("来自 {} 的响应", url))
}

async fn fetch_all(urls: Vec<&str>) {
    let mut handles = vec![];

    for url in urls {
        handles.push(tokio::spawn(async move {
            match fetch_url(url).await {
                Ok(data) => println!("✅ {}: {} 字节", url, data.len()),
                Err(e) => println!("❌ {}: {}", url, e),
            }
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let urls = vec![
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
        "https://api.example.com/stats",
    ];

    fetch_all(urls).await;
}

17.11 本章小结

要点说明
async/await异步编程核心语法
Tokio最流行的异步运行时
Future异步计算的抽象
Pin防止值在内存中移动
Stream异步迭代器
join!并发等待多个 future
select!等待第一个完成的 future
异步原语Mutex、Semaphore 等异步版本

扩展阅读

  1. Tokio 教程 — 官方异步教程
  2. Async Book — 异步编程详解
  3. Rust for Rustaceans — 深入异步机制