| Crates.io | roughage |
| lib.rs | roughage |
| version | 0.1.1 |
| created_at | 2025-12-14 22:32:02.618866+00 |
| updated_at | 2025-12-15 06:38:25.88742+00 |
| description | provides `AsyncPipeline`, a deadlock-free replacement for buffered streams |
| homepage | |
| repository | https://github.com/oconnor663/roughage |
| max_upload_size | |
| id | 1985201 |
| size | 48,018 |
roughage This crate provides a single type, AsyncPipeline, which is an alternative to buffered
streams, FuturesOrdered, and FuturesUnordered.
All of those are prone to deadlocks if any of their buffered/concurrent futures touches an
async lock of any kind, even indirectly. (For example, note that tokio::sync::mpsc
channels use a Semaphore internally.) The problem is that they don't
consistently poll their buffered futures, so a future holding a lock could stop making forward
progress through no fault of its own. AsyncPipeline fixes this whole class of deadlocks by
consistently polling all its in-flight futures until they complete. In other words,
AsyncPipeline will never "snooze" a future.
Here's how easy it is to provoke a deadlock with buffered streams:
use futures::StreamExt;
use tokio::sync::Mutex;
use tokio::time::{Duration, sleep};
static LOCK: Mutex<()> = Mutex::const_new(());
// An innocent example function that touches an async lock. Note
// that the deadlocks below can happen even if this function is
// buried three crates deep in some dependency you never see.
async fn foo() {
let _guard = LOCK.lock().await;
sleep(Duration::from_millis(1)).await;
}
futures::stream::iter([foo(), foo()])
.buffered(2)
.for_each(|_| async {
foo().await; // Deadlock!
})
.await;
Here's the same deadlock with FuturesUnordered:
let mut unordered = futures::stream::FuturesUnordered::new();
unordered.push(foo());
unordered.push(foo());
while let Some(_) = unordered.next().await {
foo().await; // Deadlock!
}
An AsyncPipeline does not have this problem, because once it's started a future internally,
it never stops polling it:
use roughage::AsyncPipeline;
AsyncPipeline::from_iter(0..100)
.map_concurrent(|_| foo(), 10)
.map_unordered(|_| foo(), 10)
.for_each_concurrent(|_| foo(), 10)
.await;
// Deadlock free!
See AsyncPipeline for more examples.
"Roughage" (ruff-edge) is an older term for dietary fiber. It keeps our pipes running smoothly.