Rust异步编程:从Future到Tokio
2025.08.06
Rust
8 min
3.0k 字
// 目录 · contents
前言 异步编程模型对比 Future Trait 手动实现Future async/await async块 async状态机 Pin Tokio运行时 任务调度(Task Spawning) Channels select!宏 异步错误处理 实战示例:并发HTTP爬虫 常见陷阱 总结
前言
Rust的异步编程模型基于零成本抽象的Future
trait。与Go的goroutine和Node.js的事件循环不同,Rust的async/await在编译期转换为状态机,没有运行时开销。本文将从Future
trait开始,深入到Tokio运行时的使用实践。
异步编程模型对比
graph TB
subgraph "Go: Goroutine"
GR[goroutine<br>轻量级线程] --> GRT[Go Runtime<br>调度器]
GRT --> OS1[OS Threads]
end
subgraph "Node.js: Event Loop"
CB[Callbacks/Promises] --> EL[Event Loop<br>单线程]
EL --> LIBUV[libuv<br>线程池]
end
subgraph "Rust: Future + Executor"
FUT[Future<br>状态机] --> EXEC[Executor<br>如Tokio]
EXEC --> OS2[OS Threads<br>多线程运行时]
end
Rust的async模型特点: -
零成本抽象 :async函数编译为状态机,无堆分配 -
惰性求值 :Future不poll就不执行 -
无运行时依赖 :核心语言不包含运行时,用户选择(Tokio/async-std)
Future Trait
1 2 3 4 5 6 7 8 9 10 11 pub trait Future { type Output ; fn poll (self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<Self ::Output>; }pub enum Poll <T> { Ready (T), Pending, }
sequenceDiagram
participant E as Executor
participant F as Future
participant W as Waker
E->>F: poll()
F-->>E: Poll::Pending
Note over F: 注册Waker,等待I/O
Note over F: I/O完成
F->>W: wake()
W->>E: 通知Future就绪
E->>F: poll()
F-->>E: Poll::Ready(value)
手动实现Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 use std::future::Future;use std::pin::Pin;use std::task::{Context, Poll};use std::time::{Duration, Instant};struct Delay { when: Instant, }impl Delay { fn new (duration: Duration) -> Self { Delay { when: Instant::now () + duration, } } }impl Future for Delay { type Output = (); fn poll (self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<()> { if Instant::now () >= self .when { Poll::Ready (()) } else { let waker = cx.waker ().clone (); let when = self .when; std::thread::spawn (move || { let now = Instant::now (); if now < when { std::thread::sleep (when - now); } waker.wake (); }); Poll::Pending } } }
async/await
async fn返回一个实现了Future的匿名类型。await暂停执行直到Future完成。
1 2 3 4 5 6 7 8 9 10 11 async fn fetch_data (url: &str ) -> Result <String , reqwest::Error> { let response = reqwest::get (url).await ?; let body = response.text ().await ?; Ok (body) }
async块
1 2 3 4 5 6 let future = async { let data = fetch_data ("https://example.com" ).await ?; process (data).await ?; Ok::<(), anyhow::Error>(()) };
async状态机
stateDiagram-v2
[*] --> State0: 初始状态
State0 --> State1: await fetch_data()
State1 --> State2: await process()
State2 --> [*]: 完成
note right of State0: 局部变量存储在状态中
note right of State1: 跨await点的变量保存在状态机中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 enum FetchAndProcess { State0 { url: String }, State1 { fetch_future: FetchFuture, url: String }, State2 { process_future: ProcessFuture }, Complete, }impl Future for FetchAndProcess { type Output = Result <(), Error>; fn poll (self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<Self ::Output> { loop { match self .state { State0 { ref url } => { let fetch_future = fetch_data (url); self .state = State1 { fetch_future, url: url.clone () }; } State1 { ref mut fetch_future, .. } => { match Pin::new (fetch_future).poll (cx) { Poll::Ready (data) => { let process_future = process (data); self .state = State2 { process_future }; } Poll::Pending => return Poll::Pending, } } State2 { ref mut process_future } => { match Pin::new (process_future).poll (cx) { Poll::Ready (result) => { self .state = Complete; return Poll::Ready (result); } Poll::Pending => return Poll::Pending, } } Complete => panic! ("polled after completion" ), } } } }
Pin
Pin确保值在内存中的位置不会改变,这对自引用的Future(跨await点持有引用)至关重要。
graph TB
subgraph "自引用结构问题"
S1[Future State] --> |field: data| D1[String data]
S1 --> |field: ref_to_data| REF1[&data]
REF1 -.->|指向| D1
NOTE1[如果Future被移动<br>ref_to_data变成悬垂引用]
end
subgraph "Pin的保证"
PIN[Pin<&mut Future>] --> S2[Future State]
S2 --> D2[String data]
S2 --> REF2[&data]
REF2 -.->|安全指向| D2
NOTE2[Pin保证Future不会被移动<br>自引用始终有效]
end
1 2 3 4 5 6 7 8 9 10 11 12 use std::pin::Pin;let mut future = Box ::pin (async { let data = String ::from ("hello" ); some_async_fn (&data).await ; println! ("{}" , data); });
Tokio运行时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #[tokio::main] async fn main () { println! ("Hello from Tokio!" ); }fn main () { tokio::runtime::Builder::new_multi_thread () .enable_all () .build () .unwrap () .block_on (async { println! ("Hello from Tokio!" ); }); }#[tokio::main(flavor = "current_thread" )] async fn main () { println! ("Single-threaded Tokio!" ); }fn main () { let runtime = tokio::runtime::Builder::new_multi_thread () .worker_threads (4 ) .max_blocking_threads (128 ) .enable_io () .enable_time () .thread_name ("my-worker" ) .build () .unwrap (); runtime.block_on (async { }); }
任务调度(Task Spawning)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 use tokio::task;#[tokio::main] async fn main () { let handle = tokio::spawn (async { expensive_computation ().await }); let result = handle.await .unwrap (); let (r1, r2, r3) = tokio::join!( fetch_url ("https://api1.example.com" ), fetch_url ("https://api2.example.com" ), fetch_url ("https://api3.example.com" ), ); let result = task::spawn_blocking (|| { std::thread::sleep (Duration::from_secs (1 )); heavy_cpu_work () }).await .unwrap (); }
graph TB
subgraph "Tokio Multi-Thread Runtime"
SCHEDULER[Scheduler] --> W1[Worker Thread 1]
SCHEDULER --> W2[Worker Thread 2]
SCHEDULER --> W3[Worker Thread 3]
SCHEDULER --> W4[Worker Thread 4]
W1 --> T1[Task A]
W1 --> T2[Task B]
W2 --> T3[Task C]
W3 --> T4[Task D]
W4 --> T5[Task E]
BP[Blocking Pool] --> BT1[Blocking Thread]
BP --> BT2[Blocking Thread]
end
style BP fill:#f57c00,color:#fff
Channels
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 use tokio::sync::{mpsc, oneshot, broadcast, watch};#[tokio::main] async fn main () { let (tx, mut rx) = mpsc::channel::<String >(32 ); let tx2 = tx.clone (); tokio::spawn (async move { tx.send ("Hello from task 1" .to_string ()).await .unwrap (); }); tokio::spawn (async move { tx2.send ("Hello from task 2" .to_string ()).await .unwrap (); }); while let Some (msg) = rx.recv ().await { println! ("Received: {}" , msg); } let (tx, rx) = oneshot::channel::<i32 >(); tokio::spawn (async move { tx.send (42 ).unwrap (); }); let value = rx.await .unwrap (); let (tx, _) = broadcast::channel::<String >(16 ); let mut rx1 = tx.subscribe (); let mut rx2 = tx.subscribe (); tx.send ("broadcast message" .to_string ()).unwrap (); let (tx, mut rx) = watch::channel ("initial" .to_string ()); tx.send ("updated" .to_string ()).unwrap (); println! ("Current: {}" , *rx.borrow ()); }
select!宏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 use tokio::time::{sleep, Duration};use tokio::sync::mpsc;#[tokio::main] async fn main () { let (tx, mut rx) = mpsc::channel::<String >(32 ); tokio::select! { msg = rx.recv () => { match msg { Some (m) => println! ("Received: {}" , m), None => println! ("Channel closed" ), } } _ = sleep (Duration::from_secs (5 )) => { println! ("Timeout!" ); } } loop { tokio::select! { Some (msg) = rx.recv () => { println! ("Got: {}" , msg); } _ = sleep (Duration::from_secs (1 )) => { println! ("Tick" ); } } } }
异步错误处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 use anyhow::{Context, Result };use thiserror::Error;#[derive(Error, Debug)] enum AppError { #[error("HTTP request failed: {0}" )] HttpError (#[from] reqwest::Error), #[error("JSON parse failed: {0}" )] ParseError (#[from] serde_json::Error), #[error("Database error: {0}" )] DbError (#[from] sqlx::Error), #[error("Not found: {0}" )] NotFound (String ), }async fn fetch_user (id: u64 ) -> Result <User, AppError> { let url = format! ("https://api.example.com/users/{}" , id); let response = reqwest::get (&url).await ?; if response.status () == 404 { return Err (AppError::NotFound (format! ("User {}" , id))); } let user : User = response.json ().await ?; Ok (user) }async fn process () -> anyhow::Result <()> { let user = fetch_user (1 ).await .context ("Failed to fetch user" )?; save_to_db (&user).await .context ("Failed to save user to database" )?; Ok (()) }
实战示例:并发HTTP爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 use tokio::sync::Semaphore;use std::sync::Arc;#[tokio::main] async fn main () -> anyhow::Result <()> { let urls = vec! [ "https://example.com/page1" , "https://example.com/page2" , "https://example.com/page3" , ]; let semaphore = Arc::new (Semaphore::new (10 )); let client = reqwest::Client::new (); let mut handles = Vec ::new (); for url in urls { let sem = semaphore.clone (); let client = client.clone (); let url = url.to_string (); let handle = tokio::spawn (async move { let _permit = sem.acquire ().await .unwrap (); match client.get (&url).send ().await { Ok (resp) => { let status = resp.status (); let body = resp.text ().await .unwrap_or_default (); println! ("{}: {} ({} bytes)" , url, status, body.len ()); Ok ((url, body)) } Err (e) => { eprintln! ("Failed {}: {}" , url, e); Err (e) } } }); handles.push (handle); } let mut results = Vec ::new (); for handle in handles { if let Ok (Ok (result)) = handle.await { results.push (result); } } println! ("Successfully fetched {} pages" , results.len ()); Ok (()) }
常见陷阱
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 async fn bad () { std::thread::sleep (Duration::from_secs (1 )); std::fs::read_to_string ("file.txt" ); }async fn good () { tokio::time::sleep (Duration::from_secs (1 )).await ; tokio::fs::read_to_string ("file.txt" ).await ; }async fn bad_mutex () { let data = std::sync::Mutex::new (vec! []); let guard = data.lock ().unwrap (); some_async_fn ().await ; guard.push (1 ); }async fn good_mutex () { let data = tokio::sync::Mutex::new (vec! []); { let mut guard = data.lock ().await ; guard.push (1 ); } some_async_fn ().await ; }async fn forgotten_await () { let future = fetch_data (); }
总结
Rust异步编程的核心要点:
Future是惰性的 :必须被poll(通过await或运行时)才会执行
async/await是语法糖 :编译器将async函数转换为状态机
Pin保证内存安全 :防止自引用的Future被移动
Tokio是最流行的运行时 :提供多线程调度器、I/O和定时器
select!用于多路复用 :等待多个Future中的第一个完成
channel用于任务间通信 :mpsc、oneshot、broadcast、watch各有用途
避免阻塞操作 :在async上下文中使用tokio的异步API或spawn_blocking
Rust的异步模型虽然学习曲线较陡,但它提供了极致的性能和安全保证,非常适合构建高性能网络服务。