Crates.io | and-then-concurrent |
lib.rs | and-then-concurrent |
version | 0.1.0 |
source | src |
created_at | 2020-12-10 23:31:35.123395 |
updated_at | 2020-12-10 23:31:35.123395 |
description | Concurrent and-then combinator for Rust streams. |
homepage | |
repository | https://github.com/akshayknarayan/and-then-concurrent |
max_upload_size | |
id | 321670 |
size | 10,789 |
Use on impl Stream
s via the [TryStreamAndThenExt
] trait.
Why is this necessary? Consider the example below. We have a Stream
from try_unfold
, but
this stream is splitting some larger stream into sub-streams, with each sub-stream
represented by a channel. If we simply called and_then
, that function's implementation,
as an optimization, only keeps one "pending" future in its state. This means that it
cannot poll the backing stream, because that might produce another future which it has no
space for. So, it must run the pending future to completion before polling the stream
again.
Unfortunately, in this case, the backing stream has to be polled for our future to resolve!
So using and_then
will deadlock. Instead, this crate makes a tradeoff: it will hold a
list of pending futures in a FuturesUnordered
, so it is safe to
poll the backing stream. This means that if the resulting futures don't resolve, we could
have a large list of futures.
let c = futures_util::stream::try_unfold(
(
0,
HashMap::<usize, mpsc::UnboundedSender<(usize, usize)>>::default(),
),
move |(mut i, mut map)| async move {
loop {
sleep(Duration::from_millis(10)).await;
let (substream, message) = (i % 3, i);
i += 1;
if i > 25 {
return Ok(None);
}
let mut new = None;
if map
.entry(substream)
.or_insert_with(|| {
let (sub_s, sub_r) = mpsc::unbounded_channel();
new = Some(sub_r);
sub_s
})
.send((substream, message))
.is_err()
{
map.remove(&substream);
}
if let Some(new_sub_r) = new {
return Ok::<_, String>(Some((new_sub_r, (i, map))));
}
}
},
)
// .and_then(...) would deadlock!
.and_then_concurrent(|mut res| async move {
loop {
let (stream, val): (usize, usize) = match res.recv().await {
None => return Ok(()),
Some(s) => s,
};
println!("got {:?} on stream {:?}", val, stream);
}
})
.try_collect::<Vec<_>>();
c.await.unwrap();