Crates.io | async-fn-stream |
lib.rs | async-fn-stream |
version | 0.2.2 |
source | src |
created_at | 2022-07-07 22:40:31.844933 |
updated_at | 2024-04-23 08:13:09.607627 |
description | Lightweight implementation of `async-stream` without macros |
homepage | https://github.com/dmitryvk/async-fn-stream |
repository | https://github.com/dmitryvk/async-fn-stream |
max_upload_size | |
id | 621520 |
size | 35,155 |
A version of async-stream without macros.
This crate provides generic implementations of Stream
trait.
Stream
is an asynchronous version of std::iter::Iterator
.
Two functions are provided - fn_stream
and try_fn_stream
.
If you need to create a stream that may result in error, use try_fn_stream
, otherwise use fn_stream
.
To create a stream:
fn_stream
or try_fn_stream
, passing a closure (anonymous function).emitter
.
To return value from the stream, call .emit(value)
on emitter
and .await
on its result.
Once stream consumer has processed the value and called .next()
on stream, .await
will return.try_fn_stream
provides some conveniences for returning errors:
return Err(...)
or the question mark (?
) operator.
This will end the stream.emitter
also has an emit_err()
method to return errors without ending the stream.Finite stream of numbers
use async_fn_stream::fn_stream;
use futures_util::Stream;
fn build_stream() -> impl Stream<Item = i32> {
fn_stream(|emitter| async move {
for i in 0..3 {
// yield elements from stream via `emitter`
emitter.emit(i).await;
}
})
}
Read numbers from text file, with error handling
use anyhow::Context;
use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
fn read_numbers(file_name: String) -> impl Stream<Item = Result<i32, anyhow::Error>> {
try_fn_stream(|emitter| async move {
// Return errors via `?` operator.
let file = BufReader::new(File::open(file_name).await.context("Failed to open file")?);
pin_mut!(file);
let mut line = String::new();
loop {
line.clear();
let byte_count = file
.read_line(&mut line)
.await
.context("Failed to read line")?;
if byte_count == 0 {
break;
}
for token in line.split_ascii_whitespace() {
let Ok(number) = token.parse::<i32>() else {
// Return errors via the `emit_err` method.
emitter.emit_err(
anyhow::anyhow!("Failed to convert string \"{token}\" to number")
).await;
continue;
};
emitter.emit(number).await;
}
}
Ok(())
})
}
async-stream
?async-stream is great! It has a nice syntax, but it is based on macros which brings some flaws: