use async_backplane::prelude::*; use futures_lite::future::{pending, ready, block_on}; // use futures_lite::stream::StreamExt; use std::thread::{spawn, JoinHandle}; fn assert_disconnect(d: Device, fault: Option) { assert_eq!((), spawn(move || d.disconnect(fault)).join().unwrap()); } fn fail(d: Device) -> JoinHandle>> { spawn(move || block_on(d.part_manage(pending()))) } fn succeed(d: Device) -> JoinHandle>> { spawn(move || block_on(d.part_manage(ready(Ok(()))))) } fn watch(mut d: Device) -> JoinHandle, Crash<()>>> { spawn(move || block_on(d.watch(ready(())))) } #[test] fn solo_succeeds() { let d1 = Device::new(); let i1 = d1.device_id(); let (d2, result) = succeed(d1).join().unwrap().unwrap(); assert_eq!(i1, d2.device_id()); assert_eq!(result, ()); } // monitored #[test] fn monitored_device_succeeds() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); d2.link(&d1, LinkMode::Monitor); d3.link(&d2, LinkMode::Monitor); assert_disconnect(d1, None); let (d4, result) = succeed(d2).join().unwrap().unwrap(); assert_eq!(d4.device_id(), i2); assert_eq!(result, ()); assert_eq!(Completed(()), watch(d3).join().unwrap().unwrap()); } #[test] fn monitored_device_errors() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i1 = d1.device_id(); let i2 = d2.device_id(); d2.link(&d1, LinkMode::Monitor); d3.link(&d2, LinkMode::Monitor); assert_disconnect(d1, Some(Fault::Error)); if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(i1, did); assert_eq!(Fault::Error, result); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); } #[test] fn monitored_device_drops() { let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); d3.link(&d2, LinkMode::Monitor); let i1 = { let d1 = Device::new(); d2.link(&d1, LinkMode::Monitor); d1.device_id() }; if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(did, i1); assert_eq!(result, Fault::Drop); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); } #[test] fn peer_device_succeeds() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); d2.link(&d1, LinkMode::Peer); d3.link(&d2, LinkMode::Monitor); assert_disconnect(d1, None); let (d4, result) = succeed(d2).join().unwrap().unwrap(); assert_eq!(d4.device_id(), i2); assert_eq!(result, ()); assert_eq!(Completed(()), watch(d3).join().unwrap().unwrap()); } #[test] fn peer_device_crashes() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i1 = d1.device_id(); let i2 = d2.device_id(); d2.link(&d1, LinkMode::Peer); d3.link(&d2, LinkMode::Monitor); assert_disconnect(d1, Some(Fault::Error)); if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(did, i1); assert!(result.is_error()); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); } #[test] fn peer_device_drops() { let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); d3.link(&d2, LinkMode::Peer); let i1 = { // d1 won't survive this block let d1 = Device::new(); d2.link(&d1, LinkMode::Peer); d1.device_id() }; if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(did, i1); assert_eq!(result, Fault::Drop); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); } #[test] fn monitored_line_succeeds() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); let line = d1.line(); d2.link_line(line, LinkMode::Monitor).unwrap(); d3.link(&d2, LinkMode::Monitor); assert_disconnect(d1, None); let (d4, result) = succeed(d2).join().unwrap().unwrap(); assert_eq!(d4.device_id(), i2); assert_eq!(result, ()); assert_eq!(Completed(()), watch(d3).join().unwrap().unwrap()); } #[test] fn monitored_line_errors() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i1 = d1.device_id(); let i2 = d2.device_id(); let line = d1.line(); d2.link_line(line, LinkMode::Monitor).unwrap(); d3.link(&d2, LinkMode::Monitor); assert_disconnect(d1, Some(Fault::Error)); if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(did, i1); assert!(result.is_error()); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); } #[test] fn monitored_line_drops() { let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); d3.link(&d2, LinkMode::Peer); let i1 = { let d1 = Device::new(); d2.link_line(d1.line(), LinkMode::Monitor).unwrap(); d1.device_id() }; if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(did, i1); assert_eq!(result, Fault::Drop); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); } #[test] fn peer_line_succeeds() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); let line = d1.line(); d2.link_line(line, LinkMode::Peer).unwrap(); d3.link(&d2, LinkMode::Peer); assert_disconnect(d1, None); let (d4, result) = succeed(d2).join().unwrap().unwrap(); assert_eq!(d4.device_id(), i2); assert_eq!(result, ()); assert_eq!(Completed(()), watch(d3).join().unwrap().unwrap()); } #[test] fn peer_line_crashes() { let d1 = Device::new(); let d2 = Device::new(); let d3 = Device::new(); let i1 = d1.device_id(); let i2 = d2.device_id(); let line = d1.line(); d2.link_line(line, LinkMode::Peer).unwrap(); d3.link(&d2, LinkMode::Monitor); assert_disconnect(d1, Some(Fault::Error)); if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(did, i1); assert!(result.is_error()); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); } #[test] fn peer_line_drops() { let d2 = Device::new(); let d3 = Device::new(); let i2 = d2.device_id(); d3.link(&d2, LinkMode::Peer); let i1 = { // d1 won't survive this block let d1 = Device::new(); d2.link_line(d1.line(), LinkMode::Peer).unwrap(); d1.device_id() }; if let Crash::Cascade(did, result) = fail(d2).join().unwrap().unwrap_err() { assert_eq!(did, i1); assert_eq!(result, Fault::Drop); } else { panic!() } let r3 = watch(d3).join().unwrap().unwrap(); assert_eq!(Messaged(Disconnected(i2, Some(Fault::Cascade(i1)))), r3); }