Crates.io | relabuf |
lib.rs | relabuf |
version | 0.16.0 |
source | src |
created_at | 2021-06-10 20:57:47.889142 |
updated_at | 2021-10-23 10:13:43.195069 |
description | Release valve buffer release items either after time or number thresholds are reached |
homepage | |
repository | https://github.com/let4be/relabuf |
max_upload_size | |
id | 408739 |
size | 14,479 |
future
hard_cap
hard_cap
is reached no longer consumes causing producer to backoff and slowdownrelease_after
has passed since the latest successful content release(or since start) and buffer is not emptysoft_cap
of items were addedconfirmed
or returned
to the bufferfuture
user can await
on[dependencies]
relabuf = "~0.16.0"
example:
use anyhow::Context;
use flume::{bounded, Sender};
use relabuf::{ExponentialBackoff, RelaBuf, RelaBufConfig};
use std::time::{Duration, Instant};
use async_io::Timer;
async fn producer(tx: Sender<u32>) {
for i in 0..16 {
let dur = Duration::from_millis(150_u64 * (i as u64));
println!("waiting {:?} before emitting {}", &dur, i);
Timer::interval(dur).await;
let t = Instant::now();
let r = tx.send_async(i).await;
println!("emit for {} took {:?}: {:?}", i, t.elapsed(), r);
}
println!("producer is finished!")
}
#[tokio::main]
async fn main() {
let (tx, rx) = bounded(5);
tokio::spawn(producer(tx));
let opts = RelaBufConfig {
soft_cap: 3,
hard_cap: 5,
release_after: Duration::from_secs(5),
backoff: Some(ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
}),
};
let (buf, proxy) = RelaBuf::new(opts, move || {
let rx = rx.clone();
Box::pin(async move { rx.recv_async().await.context("cannot read") })
});
tokio::spawn(proxy.go());
let mut i = 0;
while let Ok(consumed) = buf.next().await {
i += 1;
if i <= 7 {
println!(
"consumed {:?} because {:?}, since last consumption {:?} - returning due to err",
consumed.items, consumed.reason, consumed.elapsed
);
consumed.return_on_err();
} else {
println!(
"consumed {:?} because {:?}, since last consumption {:?}",
consumed.items, consumed.reason, consumed.elapsed
);
consumed.confirm();
}
}
println!("done ;)");
}