迭代器中的异步操作
如果一个迭代器产生了 future,有两种写法:
handle
.wait()
.await(1)
.map(|item| async move {
debug!("proc exited: {}", proc.cmd);
item
})(2)
产生一个 Result。
产生一个
Result<impl Future<Output = ExitStatus>, Error
handle
.wait()
.map(|item| async move {
debug!("proc exited: {}", proc.cmd);
item
})
.await(1)
.await(2)
产生一个
Map<impl Future<Output = Result<ExitStatus, Error>>, impl Fn(Result<ExitStatus, Error>) → impl Future<Output = Result<ExitStatus, Error>>>
产生一个
impl Future<Output = Result<ExitStatus, Error>>
产生一个
Result<ExitStatus, Error>>
两者对比,显然第二种写法更符合预期。之所以出现这种问题,是因为第一种写法对 Result 中的值进行了 Map,得到了 Result<Future> 。而第二种写法则是先对 Map await,得到了 Future<Result>
,之后再进行 await 得到了 Result。
futures
futures 库提供了多种工具用于异步编程。
stream
stream 在外观上等价于 异步迭代器 。https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html[StreamExt] trait 提供了一组方法用于为 stream 提供便携操作。
join_all
join_all 提供了等待一组协程退出的方法。
anyof
FuturesUnordered + StreamExt trait 提供了实现 anyof 的语义。对于一组协程而言,允许在有协程完成的时候退出。 footnote::[tokio::select! but for a Vec of futures]
use futures::{stream::FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, Instant};
async fn wait(millis: u64) -> u64 {
sleep(Duration::from_millis(millis)).await;
millis
}
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
futures.push(wait(500));
futures.push(wait(300));
futures.push(wait(100));
futures.push(wait(200));
let start_time = Instant::now();
let mut num_added = 0;
while let Some(wait_time) = futures.next().await {
println!("Waited {}ms", wait_time);
if num_added < 3 {
num_added += 1;
futures.push(wait(200));
}
}
println!("Completed all work in {}ms", start_time.elapsed().as_millis());
}
Pin
在 [./所有权] 中我们提到 Rust 是围绕移动而不是拷贝工作的,这意味着 Rust 可以自由地移动某些值,并将移动后的值和移动前的值视为相同。但是部分情况下结构体本身是地址敏感的,例如自引用类型。
自引用类型在访问自己的数据之前必须保证指针是有效的,然而如果自己发生了移动,就导致移动后指针依然指向旧的数据,从而引发错误。 |
当一个值满足以下条件时,一般是一个地址敏感值:
值在创建后可以被自由地移动。
例如 async 函数会返回一个由编译器实现的状态机。
某些操作依赖于自己的地址不发生变化。
例如在生成的 Fature 上第一次调用 poll 方法。
安全接口中存在 unsafe 操作,而这些操作依赖于值的地址是固定的。
例如之后调用的 poll 方法。
值在失效之前会被 drop。从而有机会去通知其它成员。
例如 drop Fature。
要解决这个问题,有两种办法:
当值被移动后,通知值。
确保值的地址不发生变化。
由于 Rust 是围绕移动语义构建的,Rust 在移动值的时候并不会通知值,因此第一种办法并不可行(C++ 是可行的)。
因此 Rust 使用了第二种方法。
Rust 通过 Pin<Ptr> 实现这种语义。正如类型名字所示,Pin 也是指针的一种,和 Box 类似。但是目的不同。