迭代器中的异步操作

如果一个迭代器产生了 future,有两种写法:

handle
    .wait()
    .await(1)
    .map(|item| async move {
        debug!("proc exited: {}", proc.cmd);
        item
    })(2)
  1. 产生一个 Result。

  2. 产生一个 Result<impl Future<Output = ExitStatus>, Error

handle
    .wait()
    .map(|item| async move {
        debug!("proc exited: {}", proc.cmd);
        item
    })
    .await(1)
    .await(2)
  1. 产生一个 Map<impl Future<Output = Result<ExitStatus, Error>>, impl Fn(Result<ExitStatus, Error>) → impl Future<Output = Result<ExitStatus, Error>>>

  2. 产生一个 impl Future<Output = Result<ExitStatus, Error>>

  3. 产生一个 Result<ExitStatus, Error>>

两者对比,显然第二种写法更符合预期。之所以出现这种问题,是因为第一种写法对 Result 中的值进行了 Map,得到了 Result<Future> 。而第二种写法则是先对 Map await,得到了 Future<Result>,之后再进行 await 得到了 Result。

futures

futures 库提供了多种工具用于异步编程。

stream

stream 在外观上等价于 异步迭代器 。https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html[StreamExt] trait 提供了一组方法用于为 stream 提供便携操作。

join_all

join_all 提供了等待一组协程退出的方法。

anyof

FuturesUnordered + StreamExt trait 提供了实现 anyof 的语义。对于一组协程而言,允许在有协程完成的时候退出。 footnote::[tokio::select! but for a Vec of futures]

use futures::{stream::FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, Instant};

async fn wait(millis: u64) -> u64 {
    sleep(Duration::from_millis(millis)).await;
    millis
}

#[tokio::main]
async fn main() {
    let mut futures = FuturesUnordered::new();
    futures.push(wait(500));
    futures.push(wait(300));
    futures.push(wait(100));
    futures.push(wait(200));

    let start_time = Instant::now();

    let mut num_added = 0;
    while let Some(wait_time) = futures.next().await {
        println!("Waited {}ms", wait_time);
        if num_added < 3 {
            num_added += 1;
            futures.push(wait(200));
        }
    }

    println!("Completed all work in {}ms", start_time.elapsed().as_millis());
}

Pin

[./所有权] 中我们提到 Rust 是围绕移动而不是拷贝工作的,这意味着 Rust 可以自由地移动某些值,并将移动后的值和移动前的值视为相同。但是部分情况下结构体本身是地址敏感的,例如自引用类型。

自引用类型在访问自己的数据之前必须保证指针是有效的,然而如果自己发生了移动,就导致移动后指针依然指向旧的数据,从而引发错误。

当一个值满足以下条件时,一般是一个地址敏感值:

  1. 值在创建后可以被自由地移动。

    例如 async 函数会返回一个由编译器实现的状态机。

  2. 某些操作依赖于自己的地址不发生变化。

    例如在生成的 Fature 上第一次调用 poll 方法。

  3. 安全接口中存在 unsafe 操作,而这些操作依赖于值的地址是固定的。

    例如之后调用的 poll 方法。

  4. 值在失效之前会被 drop。从而有机会去通知其它成员。

    例如 drop Fature。

要解决这个问题,有两种办法:

  • 当值被移动后,通知值。

  • 确保值的地址不发生变化。

由于 Rust 是围绕移动语义构建的,Rust 在移动值的时候并不会通知值,因此第一种办法并不可行(C++ 是可行的)。

因此 Rust 使用了第二种方法。

Rust 通过 Pin<Ptr> 实现这种语义。正如类型名字所示,Pin 也是指针的一种,和 Box 类似。但是目的不同。

Last moify: 2024-11-25 06:23:30
Build time:2025-07-18 09:41:42
Powered By asphinx