use std::marker::PhantomPinned; use std::mem; use std::panic; use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Barrier; mod fixture; use fixture::TrackNotify; use fixture::Unit; #[test] fn new_and_delete() { let (tx, rx) = splitrc::new(Unit); drop(tx); drop(rx); } #[test] fn drop_rx_notifies() { let (tx, rx) = splitrc::new(TrackNotify::default()); let rx2 = rx.clone(); drop(rx); drop(rx2); assert!(!tx.tx_did_drop.load(Ordering::Acquire)); assert!(tx.rx_did_drop.load(Ordering::Acquire)); } #[test] fn drop_tx_notifies() { let (tx, rx) = splitrc::new(TrackNotify::default()); let tx2 = tx.clone(); drop(tx); drop(tx2); assert!(rx.tx_did_drop.load(Ordering::Acquire)); assert!(!rx.rx_did_drop.load(Ordering::Acquire)); } #[test] fn debug_formatting() { assert_eq!("Unit", format!("{:?}", Unit)); assert_eq!("Unit", format!("{:?}", Arc::new(Unit))); let (tx, rx) = splitrc::new(Unit); assert_eq!("Unit", format!("{:?}", tx)); assert_eq!("Unit", format!("{:?}", rx)); } #[test] fn display_formatting() { assert_eq!("Unit", format!("{}", Unit)); assert_eq!("Unit", format!("{}", Arc::new(Unit))); let (tx, rx) = splitrc::new(Unit); assert_eq!("Unit", format!("{}", tx)); assert_eq!("Unit", format!("{}", rx)); } #[test] #[ignore] fn tx_panic_on_overflow() { let (tx, rx) = splitrc::new(Unit); drop(rx); let result = panic::catch_unwind(|| loop { mem::forget(tx.clone()) }); assert!(result.is_err()); } #[test] #[ignore] fn rx_panic_on_overflow() { let (tx, rx) = splitrc::new(Unit); drop(tx); let result = panic::catch_unwind(|| loop { mem::forget(rx.clone()) }); assert!(result.is_err()); } #[test] fn pointers_are_unpinned() { let (tx, rx) = splitrc::new(Unit); let _: &dyn Unpin = &tx; let _: &dyn Unpin = ℞ } #[derive(Default)] struct MustPin { v: u32, tx_did_drop: AtomicBool, rx_did_drop: AtomicBool, _pinned: PhantomPinned, } impl MustPin { fn drop_tx(self: Pin<&Self>) { self.tx_did_drop.store(true, Ordering::Release); } fn drop_rx(self: Pin<&Self>) { self.rx_did_drop.store(true, Ordering::Release); } } impl splitrc::Notify for MustPin { fn last_tx_did_drop_pinned(self: Pin<&Self>) { self.drop_tx() } fn last_rx_did_drop_pinned(self: Pin<&Self>) { self.drop_rx() } } #[test] fn alloc_pinned() { let (tx, rx): (Pin>, Pin>) = splitrc::pin(Default::default()); assert_eq!(0, tx.v); assert_eq!(0, rx.v); } #[test] fn drop_tx_pinned() { let (tx, rx): (Pin>, Pin>) = splitrc::pin(Default::default()); assert_eq!(false, tx.tx_did_drop.load(Ordering::Acquire)); drop(tx); assert_eq!(true, rx.tx_did_drop.load(Ordering::Acquire)); } #[test] fn drop_rx_pinned() { let (tx, rx): (Pin>, Pin>) = splitrc::pin(Default::default()); assert_eq!(false, rx.rx_did_drop.load(Ordering::Acquire)); drop(rx); assert_eq!(true, tx.rx_did_drop.load(Ordering::Acquire)); } struct Count<'a> { count: &'a AtomicU64, } impl splitrc::Notify for Count<'_> { fn last_tx_did_drop(&self) { self.count.fetch_add(1, Ordering::Relaxed); } fn last_rx_did_drop(&self) { self.count.fetch_add(1, Ordering::Relaxed); } } #[test] fn stress_threads() { const T: u64 = 16; let count = AtomicU64::new(0); let barrier = Arc::new(Barrier::new((T * 2) as usize)); let barrier = &barrier; let (tx, rx) = splitrc::new(Count { count: &count }); std::thread::scope(move |s| { for _ in 0..T { s.spawn({ let tx = tx.clone(); move || { barrier.wait(); tx.count.fetch_add(1, Ordering::Relaxed); } }); s.spawn({ let rx = rx.clone(); move || { barrier.wait(); rx.count.fetch_add(1, Ordering::Relaxed); } }); } drop(tx); drop(rx); }); let final_count = count.load(Ordering::Relaxed); assert_eq!(1 + T + T, final_count); } #[test] fn stress_dealloc() { const T: u64 = 64; let count = AtomicU64::new(0); std::thread::scope(|s| { for _ in 0..T { let (tx, rx) = splitrc::new(Count { count: &count }); s.spawn(move || { tx.count.fetch_add(1, Ordering::Relaxed); }); s.spawn(move || { rx.count.fetch_add(1, Ordering::Relaxed); }); } for _ in 0..T { let (tx, rx) = splitrc::new(Count { count: &count }); // Spawn in opposite order. s.spawn(move || { rx.count.fetch_add(1, Ordering::Relaxed); }); s.spawn(move || { tx.count.fetch_add(1, Ordering::Relaxed); }); } }); let final_count = count.load(Ordering::Acquire); assert_eq!(6 * T, final_count); }