Rust · #async#rust#tokio#future

Rust异步编程:从Future到Tokio

2025.08.06 Rust 8 min 3.0k
// 目录 · contents

前言

Rust的异步编程模型基于零成本抽象的Future trait。与Go的goroutine和Node.js的事件循环不同,Rust的async/await在编译期转换为状态机,没有运行时开销。本文将从Future trait开始,深入到Tokio运行时的使用实践。

异步编程模型对比

graph TB
    subgraph "Go: Goroutine"
        GR[goroutine<br>轻量级线程] --> GRT[Go Runtime<br>调度器]
        GRT --> OS1[OS Threads]
    end

    subgraph "Node.js: Event Loop"
        CB[Callbacks/Promises] --> EL[Event Loop<br>单线程]
        EL --> LIBUV[libuv<br>线程池]
    end

    subgraph "Rust: Future + Executor"
        FUT[Future<br>状态机] --> EXEC[Executor<br>如Tokio]
        EXEC --> OS2[OS Threads<br>多线程运行时]
    end

Rust的async模型特点: - 零成本抽象:async函数编译为状态机,无堆分配 - 惰性求值:Future不poll就不执行 - 无运行时依赖:核心语言不包含运行时,用户选择(Tokio/async-std)

Future Trait

1
2
3
4
5
6
7
8
9
10
11
// Future trait定义
pub trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
Ready(T), // 完成,返回结果
Pending, // 未完成,稍后再试
}
sequenceDiagram
    participant E as Executor
    participant F as Future
    participant W as Waker

    E->>F: poll()
    F-->>E: Poll::Pending
    Note over F: 注册Waker,等待I/O

    Note over F: I/O完成
    F->>W: wake()
    W->>E: 通知Future就绪

    E->>F: poll()
    F-->>E: Poll::Ready(value)

手动实现Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// 自定义计时器Future
struct Delay {
when: Instant,
}

impl Delay {
fn new(duration: Duration) -> Self {
Delay {
when: Instant::now() + duration,
}
}
}

impl Future for Delay {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// 注册waker,当时间到时唤醒
let waker = cx.waker().clone();
let when = self.when;
std::thread::spawn(move || {
let now = Instant::now();
if now < when {
std::thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}

async/await

async fn返回一个实现了Future的匿名类型。await暂停执行直到Future完成。

1
2
3
4
5
6
7
8
9
10
11
// async函数
async fn fetch_data(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}

// 等价于(编译器生成的状态机,简化)
// fn fetch_data(url: &str) -> impl Future<Output = Result<String, reqwest::Error>> {
// // 状态机,包含每个await点的状态
// }

async块

1
2
3
4
5
6
// async块创建匿名Future
let future = async {
let data = fetch_data("https://example.com").await?;
process(data).await?;
Ok::<(), anyhow::Error>(())
};

async状态机

stateDiagram-v2
    [*] --> State0: 初始状态
    State0 --> State1: await fetch_data()
    State1 --> State2: await process()
    State2 --> [*]: 完成

    note right of State0: 局部变量存储在状态中
    note right of State1: 跨await点的变量保存在状态机中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 编译器将async函数转换为状态机(概念示意)
enum FetchAndProcess {
State0 { url: String },
State1 { fetch_future: FetchFuture, url: String },
State2 { process_future: ProcessFuture },
Complete,
}

impl Future for FetchAndProcess {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.state {
State0 { ref url } => {
let fetch_future = fetch_data(url);
self.state = State1 { fetch_future, url: url.clone() };
}
State1 { ref mut fetch_future, .. } => {
match Pin::new(fetch_future).poll(cx) {
Poll::Ready(data) => {
let process_future = process(data);
self.state = State2 { process_future };
}
Poll::Pending => return Poll::Pending,
}
}
State2 { ref mut process_future } => {
match Pin::new(process_future).poll(cx) {
Poll::Ready(result) => {
self.state = Complete;
return Poll::Ready(result);
}
Poll::Pending => return Poll::Pending,
}
}
Complete => panic!("polled after completion"),
}
}
}
}

Pin

Pin确保值在内存中的位置不会改变,这对自引用的Future(跨await点持有引用)至关重要。

graph TB
    subgraph "自引用结构问题"
        S1[Future State] --> |field: data| D1[String data]
        S1 --> |field: ref_to_data| REF1[&data]
        REF1 -.->|指向| D1

        NOTE1[如果Future被移动<br>ref_to_data变成悬垂引用]
    end

    subgraph "Pin的保证"
        PIN[Pin<&mut Future>] --> S2[Future State]
        S2 --> D2[String data]
        S2 --> REF2[&data]
        REF2 -.->|安全指向| D2

        NOTE2[Pin保证Future不会被移动<br>自引用始终有效]
    end
1
2
3
4
5
6
7
8
9
10
11
12
use std::pin::Pin;

// Pin的基本使用
let mut future = Box::pin(async {
let data = String::from("hello");
// data的引用跨越了await点
some_async_fn(&data).await;
println!("{}", data);
});

// 大多数情况下,你不需要直接操作Pin
// async/await和tokio::spawn会自动处理

Tokio运行时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Cargo.toml
// [dependencies]
// tokio = { version = "1", features = ["full"] }

// 多线程运行时(默认)
#[tokio::main]
async fn main() {
println!("Hello from Tokio!");
}

// 等价于
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello from Tokio!");
});
}

// 单线程运行时(适合不需要多线程的场景)
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("Single-threaded Tokio!");
}

// 自定义运行时配置
fn main() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 工作线程数
.max_blocking_threads(128) // 最大阻塞线程数
.enable_io() // 启用I/O
.enable_time() // 启用定时器
.thread_name("my-worker") // 线程名称
.build()
.unwrap();

runtime.block_on(async {
// 异步代码
});
}

任务调度(Task Spawning)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use tokio::task;

#[tokio::main]
async fn main() {
// spawn: 创建新任务(类似goroutine)
let handle = tokio::spawn(async {
// 在Tokio线程池中执行
expensive_computation().await
});

// 等待任务完成
let result = handle.await.unwrap();

// 并发执行多个任务
let (r1, r2, r3) = tokio::join!(
fetch_url("https://api1.example.com"),
fetch_url("https://api2.example.com"),
fetch_url("https://api3.example.com"),
);

// spawn_blocking: 在专用线程上运行阻塞代码
let result = task::spawn_blocking(|| {
// 阻塞操作,不会阻塞Tokio运行时
std::thread::sleep(Duration::from_secs(1));
heavy_cpu_work()
}).await.unwrap();
}
graph TB
    subgraph "Tokio Multi-Thread Runtime"
        SCHEDULER[Scheduler] --> W1[Worker Thread 1]
        SCHEDULER --> W2[Worker Thread 2]
        SCHEDULER --> W3[Worker Thread 3]
        SCHEDULER --> W4[Worker Thread 4]

        W1 --> T1[Task A]
        W1 --> T2[Task B]
        W2 --> T3[Task C]
        W3 --> T4[Task D]
        W4 --> T5[Task E]

        BP[Blocking Pool] --> BT1[Blocking Thread]
        BP --> BT2[Blocking Thread]
    end

    style BP fill:#f57c00,color:#fff

Channels

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use tokio::sync::{mpsc, oneshot, broadcast, watch};

#[tokio::main]
async fn main() {
// mpsc: 多生产者,单消费者
let (tx, mut rx) = mpsc::channel::<String>(32); // 缓冲区大小32

let tx2 = tx.clone(); // 克隆发送端

tokio::spawn(async move {
tx.send("Hello from task 1".to_string()).await.unwrap();
});

tokio::spawn(async move {
tx2.send("Hello from task 2".to_string()).await.unwrap();
});

while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}

// oneshot: 单次发送
let (tx, rx) = oneshot::channel::<i32>();
tokio::spawn(async move {
tx.send(42).unwrap();
});
let value = rx.await.unwrap();

// broadcast: 多生产者,多消费者
let (tx, _) = broadcast::channel::<String>(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

tx.send("broadcast message".to_string()).unwrap();

// watch: 单生产者,多消费者,只保留最新值
let (tx, mut rx) = watch::channel("initial".to_string());
tx.send("updated".to_string()).unwrap();
println!("Current: {}", *rx.borrow());
}

select!宏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use tokio::time::{sleep, Duration};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(32);

// select!: 等待多个Future,第一个完成的获胜
tokio::select! {
msg = rx.recv() => {
match msg {
Some(m) => println!("Received: {}", m),
None => println!("Channel closed"),
}
}
_ = sleep(Duration::from_secs(5)) => {
println!("Timeout!");
}
}

// 循环中使用select
loop {
tokio::select! {
Some(msg) = rx.recv() => {
println!("Got: {}", msg);
}
_ = sleep(Duration::from_secs(1)) => {
println!("Tick");
}
// biased; // 添加biased使select按顺序检查,而非随机
}
}
}

异步错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use anyhow::{Context, Result};
use thiserror::Error;

// 自定义错误类型
#[derive(Error, Debug)]
enum AppError {
#[error("HTTP request failed: {0}")]
HttpError(#[from] reqwest::Error),

#[error("JSON parse failed: {0}")]
ParseError(#[from] serde_json::Error),

#[error("Database error: {0}")]
DbError(#[from] sqlx::Error),

#[error("Not found: {0}")]
NotFound(String),
}

// 使用?操作符传播错误
async fn fetch_user(id: u64) -> Result<User, AppError> {
let url = format!("https://api.example.com/users/{}", id);
let response = reqwest::get(&url).await?; // HttpError

if response.status() == 404 {
return Err(AppError::NotFound(format!("User {}", id)));
}

let user: User = response.json().await?; // HttpError or ParseError
Ok(user)
}

// 使用anyhow简化错误处理
async fn process() -> anyhow::Result<()> {
let user = fetch_user(1).await
.context("Failed to fetch user")?;

save_to_db(&user).await
.context("Failed to save user to database")?;

Ok(())
}

实战示例:并发HTTP爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use tokio::sync::Semaphore;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let urls = vec![
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
// ... 更多URL
];

// 使用信号量限制并发数
let semaphore = Arc::new(Semaphore::new(10)); // 最多10个并发请求
let client = reqwest::Client::new();

let mut handles = Vec::new();

for url in urls {
let sem = semaphore.clone();
let client = client.clone();
let url = url.to_string();

let handle = tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap(); // 获取许可
// 许可在_permit被drop时自动释放

match client.get(&url).send().await {
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
println!("{}: {} ({} bytes)", url, status, body.len());
Ok((url, body))
}
Err(e) => {
eprintln!("Failed {}: {}", url, e);
Err(e)
}
}
});

handles.push(handle);
}

// 收集所有结果
let mut results = Vec::new();
for handle in handles {
if let Ok(Ok(result)) = handle.await {
results.push(result);
}
}

println!("Successfully fetched {} pages", results.len());
Ok(())
}

常见陷阱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 1. 在async中使用标准库的阻塞操作
// 错误:会阻塞Tokio工作线程
async fn bad() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞!
std::fs::read_to_string("file.txt"); // 阻塞!
}

// 正确:使用Tokio的异步版本
async fn good() {
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::fs::read_to_string("file.txt").await;
}

// 2. 跨await点持有Mutex
// 错误:标准Mutex在await点持有可能导致死锁
async fn bad_mutex() {
let data = std::sync::Mutex::new(vec![]);
let guard = data.lock().unwrap();
some_async_fn().await; // 持有锁跨越await!
guard.push(1);
}

// 正确:使用tokio::sync::Mutex或缩小锁范围
async fn good_mutex() {
let data = tokio::sync::Mutex::new(vec![]);
{
let mut guard = data.lock().await;
guard.push(1);
} // 锁在await之前释放
some_async_fn().await;
}

// 3. 忘记.await
async fn forgotten_await() {
let future = fetch_data(); // 这只是创建了Future,并没有执行!
// 需要: let result = fetch_data().await;
}

总结

Rust异步编程的核心要点:

  1. Future是惰性的:必须被poll(通过await或运行时)才会执行
  2. async/await是语法糖:编译器将async函数转换为状态机
  3. Pin保证内存安全:防止自引用的Future被移动
  4. Tokio是最流行的运行时:提供多线程调度器、I/O和定时器
  5. select!用于多路复用:等待多个Future中的第一个完成
  6. channel用于任务间通信:mpsc、oneshot、broadcast、watch各有用途
  7. 避免阻塞操作:在async上下文中使用tokio的异步API或spawn_blocking

Rust的异步模型虽然学习曲线较陡,但它提供了极致的性能和安全保证,非常适合构建高性能网络服务。

作者 · authorzt
发布 · date2025-08-06
篇幅 · length3.0k 字 · 8 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论