Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Rust Streams: How to transform a TryStream<Ok = Vec<Item>, Error = MyErrorType>> into TryStream<Ok = Item, Error = MyErrorType>

I’m playing around with streams in Rust using the futures-util crate.

I’m creating a stream that requests a new page from a paginated API on each step.
The item of each page is basically a Vector. And as it might be obvious, each call to the server to request a new page might fail.

Instead of returning a Stream of vectors I would like to return a stream of items that yields an error (and therefore closes the stream) when trying to fetch an item from a page that failed.

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

Here is my current function:

    pub fn search(
        &self,
        query: Query,
    ) -> impl TryStream<Ok = Vec<Item>, Error = MyError> {
        let initial_state = (query, None);
        stream::try_unfold(initial_state, |(query, page_token)| async move {
            let (items, next_page_token) = fetch_page(&query, page_token).await?;
            if items.len() <= 0 {
                Ok(None)
            } else {
                Ok(Some((items, (query, next_page_token))))
            }
        })
    }

I would like to modify it so the return type is impl TryStream<Ok = Item, Error = MyError>

Is it even possible?

>Solution :

You can use try_flatten(), however it requires every the inner stream to be a TryStream too so you need to map every item:

use futures_util::stream::TryStreamExt;

pub fn search(query: Query) -> impl TryStream<Ok = Item, Error = MyError> {
    let initial_state = (query, None);
    stream::try_unfold(initial_state, |(query, page_token)| async move {
        let (items, next_page_token) = fetch_page(&query, page_token).await?;
        if items.len() <= 0 {
            Ok(None)
        } else {
            Ok(Some((items, (query, next_page_token))))
        }
    })
    .map_ok(|items| stream::iter(items.into_iter().map(|item| Ok(item))))
    .try_flatten()
}

Note however that it is better to return impl Stream<Item = Result<Item, MyError>> than impl TryStream, because impl Stream will automatically implement TryStream too but the opposite is not true.

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading