use std::{ future::pending, sync::{Arc, Mutex}, thread::{scope, sleep}, time::Duration, }; use cancellation_token::{CancelCallback, Canceled, CancellationToken, CancellationTokenSource}; use rt_local::{ runtime::core::{run, test}, wait_for_idle, }; #[test] fn source_new() { let cts = CancellationTokenSource::new(); let ct = cts.token(); assert!(!cts.is_canceled()); assert!(!ct.is_canceled()); assert!(ct.can_be_canceled()); } #[test] fn cancel_and_is_canceled() { let cts = CancellationTokenSource::new(); let ct = cts.token(); cts.cancel(); assert!(ct.can_be_canceled()); assert!(cts.is_canceled()); assert!(ct.is_canceled()); } #[test] fn cancel_defer() { let cts = CancellationTokenSource::new(); let d = cts.cancel_defer(); assert!(!cts.is_canceled()); drop(d); assert!(cts.is_canceled()); } #[test] fn cancel_defer_detach() { let cts = CancellationTokenSource::new(); let d = cts.cancel_defer(); assert!(!cts.is_canceled()); d.detach(); assert!(!cts.is_canceled()); } #[test] fn token_new() { let ct = CancellationToken::new(false); assert!(!ct.can_be_canceled()); assert!(!ct.is_canceled()); let ct = CancellationToken::new(true); assert!(ct.can_be_canceled()); assert!(ct.is_canceled()); } #[test] fn canceled() { let cts = CancellationTokenSource::new(); let ct = cts.token(); assert!(ct.canceled().is_ok()); cts.cancel(); assert!(ct.canceled().is_err()); } #[test] fn wait() { let logs = Logs::new(); let cts = CancellationTokenSource::new(); scope(|s| { run(async { let ct = cts.token(); s.spawn(|| { sleep(Duration::from_millis(500)); logs.push("cancel"); cts.cancel(); }); logs.push("wait"); ct.wait().await; logs.push("wake"); }) }); logs.verify(&["wait", "cancel", "wake"]); } #[test] async fn wait_for_canceled_already_canceled() { let cts = CancellationTokenSource::new(); let ct = cts.token(); cts.cancel(); ct.wait().await; } #[test] fn with() { let logs = Logs::new(); let cts = CancellationTokenSource::new(); scope(|s| { run(async { let ct = cts.token(); s.spawn(|| { sleep(Duration::from_millis(500)); logs.push("cancel"); cts.cancel(); }); let r = ct .run(async { logs.push("1"); wait_for_idle().await; logs.push("2"); sleep(Duration::from_millis(1000)); wait_for_idle().await; logs.push("3"); }) .await; assert_eq!(r, Err(Canceled)); logs.push("finish"); }) }); logs.verify(&["1", "2", "cancel", "finish"]); } #[test] async fn with_already_canceled() { let cts = CancellationTokenSource::new(); let ct = cts.token(); cts.cancel(); let r = ct.run(pending::<()>()).await; assert_eq!(r, Err(Canceled)); } #[test] fn with_parent() { let parent = CancellationTokenSource::new(); let child = CancellationTokenSource::with_parent(&parent.token()); assert!(!parent.is_canceled()); assert!(!child.is_canceled()); parent.cancel(); assert!(parent.is_canceled()); assert!(child.is_canceled()); } #[test] fn with_parent_2() { let cts0 = CancellationTokenSource::new(); let cts1 = CancellationTokenSource::with_parent(&cts0.token()); let cts2 = CancellationTokenSource::with_parent(&cts1.token()); cts0.cancel(); assert!(cts0.is_canceled()); assert!(cts1.is_canceled()); assert!(cts2.is_canceled()); } #[test] fn with_parent_drop_middle() { let cts0 = CancellationTokenSource::new(); let cts1 = CancellationTokenSource::with_parent(&cts0.token()); let cts2 = CancellationTokenSource::with_parent(&cts1.token()); drop(cts1); cts0.cancel(); assert!(cts0.is_canceled()); assert!(cts2.is_canceled()); } #[test] fn with_parent_many_child() { let parent = CancellationTokenSource::new(); let child0 = CancellationTokenSource::with_parent(&parent.token()); let child1 = CancellationTokenSource::with_parent(&parent.token()); parent.cancel(); assert!(parent.is_canceled()); assert!(child0.is_canceled()); assert!(child1.is_canceled()); } #[test] fn with_praent_already_canceled() { let parent = CancellationTokenSource::new(); parent.cancel(); let child = CancellationTokenSource::with_parent(&parent.token()); assert!(parent.is_canceled()); assert!(child.is_canceled()); } #[test] fn with_praent_is_canceled_true() { let cts = CancellationTokenSource::with_parent(&CancellationToken::new(true)); assert!(cts.is_canceled()); } #[test] fn with_praent_is_canceled_false() { let cts = CancellationTokenSource::with_parent(&CancellationToken::new(false)); assert!(!cts.is_canceled()); } #[test] fn register() { let logs = Logs::new(); let cts = CancellationTokenSource::new(); let ct = cts.token(); let _r = ct.register({ let logs = logs.clone(); CancelCallback::FnOnce(Box::new(move || { logs.push("cancel"); })) }); cts.cancel(); logs.verify(&["cancel"]); } #[test] fn register_unregister() { let logs = Logs::new(); let cts = CancellationTokenSource::new(); let ct = cts.token(); let _ = ct.register({ let logs = logs.clone(); CancelCallback::FnOnce(Box::new(move || { logs.push("cancel"); })) }); cts.cancel(); logs.verify(&[]); } #[test] fn register_detach() { let logs = Logs::new(); let cts = CancellationTokenSource::new(); let ct = cts.token(); ct.register({ let logs = logs.clone(); CancelCallback::FnOnce(Box::new(move || { logs.push("cancel"); })) }) .detach(); cts.cancel(); logs.verify(&["cancel"]); } #[test] fn anyhow_compatibility() { fn _f(e: Canceled) -> anyhow::Error { e.into() } } #[derive(Clone)] struct Logs(Arc>>); impl Logs { fn new() -> Self { Self(Arc::new(Mutex::new(Vec::new()))) } fn push(&self, s: &'static str) { self.0.lock().unwrap().push(s); } fn verify(&self, expected: &[&'static str]) { assert_eq!(self.0.lock().unwrap().as_slice(), expected); } }