I’m playing around with streams in Rust using the futures-util crate.
I’m creating a stream that requests a new page from a paginated API on each step.
The item of each page is basically a Vector. And as it might be obvious, each call to the server to request a new page might fail.
Instead of returning a Stream of vectors I would like to return a stream of items that yields an error (and therefore closes the stream) when trying to fetch an item from a page that failed.
Here is my current function:
pub fn search(
&self,
query: Query,
) -> impl TryStream<Ok = Vec<Item>, Error = MyError> {
let initial_state = (query, None);
stream::try_unfold(initial_state, |(query, page_token)| async move {
let (items, next_page_token) = fetch_page(&query, page_token).await?;
if items.len() <= 0 {
Ok(None)
} else {
Ok(Some((items, (query, next_page_token))))
}
})
}
I would like to modify it so the return type is impl TryStream<Ok = Item, Error = MyError>
Is it even possible?
>Solution :
You can use try_flatten(), however it requires every the inner stream to be a TryStream too so you need to map every item:
use futures_util::stream::TryStreamExt;
pub fn search(query: Query) -> impl TryStream<Ok = Item, Error = MyError> {
let initial_state = (query, None);
stream::try_unfold(initial_state, |(query, page_token)| async move {
let (items, next_page_token) = fetch_page(&query, page_token).await?;
if items.len() <= 0 {
Ok(None)
} else {
Ok(Some((items, (query, next_page_token))))
}
})
.map_ok(|items| stream::iter(items.into_iter().map(|item| Ok(item))))
.try_flatten()
}
Note however that it is better to return impl Stream<Item = Result<Item, MyError>> than impl TryStream, because impl Stream will automatically implement TryStream too but the opposite is not true.