| Crates.io | enough-tokio |
| lib.rs | enough-tokio |
| version | 0.4.0 |
| created_at | 2026-01-11 08:49:38.715755+00 |
| updated_at | 2026-01-18 07:51:12.930743+00 |
| description | Tokio integration for the enough cooperative cancellation trait |
| homepage | |
| repository | https://github.com/imazen/enough |
| max_upload_size | |
| id | 2035433 |
| size | 26,508 |
Tokio integration for the enough cooperative cancellation trait.
This crate bridges tokio's CancellationToken with the Stop trait, allowing you to use tokio's cancellation system with any library that accepts impl Stop.
Stop trait across async and sync codeimpl Stop librariesuse enough_tokio::TokioStop;
use enough::Stop;
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
let stop = TokioStop::new(token.clone());
// Use in spawn_blocking for CPU-intensive work
let handle = tokio::task::spawn_blocking({
let stop = stop.clone();
move || {
for i in 0..1_000_000 {
if i % 1000 == 0 && stop.should_stop() {
return Err("cancelled");
}
// do work...
}
Ok("completed")
}
});
// Cancel from async context
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
token.cancel();
let result = handle.await.unwrap();
println!("Result: {:?}", result);
}
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let stop = TokioStop::new(token.clone());
// Check cancellation
assert!(!stop.should_stop());
// Cancel
token.cancel();
assert!(stop.should_stop());
use enough_tokio::CancellationTokenStopExt;
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let stop = token.as_stop(); // Creates TokioStop
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
async fn wait_for_cancellation(stop: TokioStop) {
// Wait until cancelled
stop.cancelled().await;
println!("Cancelled!");
}
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
let parent = TokioStop::new(CancellationToken::new());
let child = parent.child();
// Child is cancelled when parent is cancelled
parent.cancel();
assert!(child.should_stop());
tokio::select!For one-shot select (runs once):
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
async fn do_work_with_cancellation(stop: TokioStop) -> Result<(), &'static str> {
tokio::select! {
_ = stop.cancelled() => Err("cancelled"),
result = async_work() => Ok(result),
}
}
async fn async_work() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Important: For select in a loop, pin the future to avoid recreating it each iteration:
use enough_tokio::TokioStop;
use tokio::sync::mpsc;
async fn process_messages(stop: TokioStop, mut rx: mpsc::Receiver<String>) {
// Pin the future OUTSIDE the loop
let cancelled = stop.cancelled();
tokio::pin!(cancelled);
loop {
tokio::select! {
// Use &mut to poll the pinned future without consuming it
_ = &mut cancelled => {
println!("Cancelled!");
break;
}
msg = rx.recv() => {
match msg {
Some(m) => println!("Got: {}", m),
None => break,
}
}
}
}
}
Wrong (creates new future each iteration, inefficient):
// DON'T do this in a loop!
loop {
tokio::select! {
_ = stop.cancelled() => break, // New future each time!
msg = rx.recv() => { /* ... */ }
}
}
Any library that accepts impl Stop works seamlessly:
use enough_tokio::TokioStop;
use enough::Stop;
use tokio_util::sync::CancellationToken;
// Example library function
fn process_data(data: &[u8], stop: impl Stop) -> Result<Vec<u8>, &'static str> {
let mut output = Vec::new();
for (i, chunk) in data.chunks(1024).enumerate() {
if i % 16 == 0 && stop.should_stop() {
return Err("cancelled");
}
output.extend_from_slice(chunk);
}
Ok(output)
}
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
let stop = TokioStop::new(token.clone());
let data = vec![0u8; 100_000];
let handle = tokio::task::spawn_blocking(move || {
process_data(&data, stop)
});
// Cancel after a short delay
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
token.cancel();
});
let result = handle.await.unwrap();
println!("Result: {:?}", result);
}
TokioStop| Method | Description |
|---|---|
new(token) |
Create from CancellationToken |
token() |
Get reference to underlying token |
into_token() |
Consume and return underlying token |
cancelled() |
Async wait for cancellation |
child() |
Create a child TokioStop |
cancel() |
Trigger cancellation |
should_stop() |
Check if cancelled (from Stop trait) |
check() |
Check with Result return (from Stop trait) |
CancellationTokenStopExtExtension trait for CancellationToken (named to avoid conflicts with potential future tokio_util traits):
| Method | Description |
|---|---|
as_stop() |
Convert to TokioStop |
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
// From CancellationToken
let stop: TokioStop = token.clone().into();
// Back to CancellationToken
let token2: CancellationToken = stop.into();
TokioStop is Send + Sync and can be safely shared across threads and tasks.
Licensed under either of Apache License, Version 2.0 or MIT license at your option.