Crates.io | sync_stream |
lib.rs | sync_stream |
version | 0.1.0 |
source | src |
created_at | 2023-07-16 14:00:13.445577 |
updated_at | 2023-07-16 14:00:13.445577 |
description | Aggregate multiple streams by polling stream items in order, keeping the consumption of multiple streams in sync. |
homepage | |
repository | https://github.com/andrewlowndes/sync_stream |
max_upload_size | |
id | 917771 |
size | 35,545 |
Aggregate multiple streams by polling stream items in order, keeping the consumption of multiple streams in sync. Uses the PartialOrd impl of the stream items, allowing ordering of mixed stream item types.
This works by emitting stream items only once all of the streams being collected have newer items or have ended. This could mean that if there is a delay in a stream then the other stream items may take longer to be delivered. To mitigate this no-op events could be sent (just with an id to keep processing consistent)
[dependencies]
sync_stream = "0.1.0"
use async_stream::stream;
use futures::StreamExt;
use rand::random;
use std::{
cmp::Ordering,
time::Duration,
};
use tokio::time::sleep;
use sync_stream::sync_stream;
#[derive(Clone, Debug)]
struct Item<T> {
id: u32,
value: T,
}
impl<T> Eq for Item<T> {}
//implement ordering for our item
impl<T, B> PartialEq<Item<B>> for Item<T> {
fn eq(&self, other: &Item<B>) -> bool {
self.id == other.id
}
}
impl<T, B> PartialOrd<Item<B>> for Item<T> {
fn partial_cmp(&self, other: &Item<B>) -> Option<Ordering> {
self.id.partial_cmp(&other.id)
}
}
impl<T> Ord for Item<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.id.cmp(&other.id)
}
}
async fn delay() {
sleep(Duration::from_millis(random::<u8>().into())).await;
}
#[tokio::main]
async fn main() {
let a = stream! {
delay().await;
yield Item { id: 1, value: 100 };
delay().await;
yield Item { id: 6, value: 200 };
delay().await;
yield Item { id: 8, value: 100 };
delay().await;
yield Item { id: 9, value: 300 };
delay().await;
yield Item { id: 10, value: 100 };
delay().await;
yield Item { id: 18, value: 900 };
delay().await;
};
let b = stream! {
delay().await;
yield Item { id: 2, value: "a" };
delay().await;
yield Item { id: 4, value: "z" };
delay().await;
yield Item { id: 14, value: "r" };
delay().await;
yield Item { id: 23, value: "c" };
delay().await;
};
let c = stream! {
delay().await;
yield Item { id: 3, value: 'p' };
delay().await;
yield Item { id: 5, value: 'c' };
delay().await;
yield Item { id: 17, value: 'd' };
delay().await;
yield Item { id: 19, value: 'w' };
delay().await;
};
//our three stream items will be emitted ordered by the id in our stream items
sync_stream!(a, b, c)
.for_each(|(a, b, c)| async move {
println!("{a:?},{b:?},{c:?}");
})
.await;
}