use std::error::Error; use std::time::Duration; use hydroflow::scheduled::ticks::{TickDuration, TickInstant}; use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax, rassert_eq}; use multiplatform_test::multiplatform_test; use tokio::time::timeout; #[multiplatform_test(test, wasm, env_tracing)] pub fn test_stratum_loop() { let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { source_iter([TickInstant::new(0)]) -> union_tee; union_tee = union() -> tee(); union_tee -> map(|n| n + TickDuration::SINGLE_TICK) -> filter(|&n| n < TickInstant::new(10)) -> next_stratum() -> union_tee; union_tee -> for_each(|v| out_send.send(v).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!( &[ TickInstant::new(0), TickInstant::new(1), TickInstant::new(2), TickInstant::new(3), TickInstant::new(4), TickInstant::new(5), TickInstant::new(6), TickInstant::new(7), TickInstant::new(8), TickInstant::new(9) ], &*hydroflow::util::collect_ready::, _>(&mut out_recv) ); assert_eq!( (TickInstant::new(11), 0), (df.current_tick(), df.current_stratum()) ); } #[multiplatform_test(test, wasm, env_tracing)] pub fn test_tick_loop() { let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { source_iter([TickInstant::new(0)]) -> union_tee; union_tee = union() -> tee(); union_tee -> map(|n| n + TickDuration::SINGLE_TICK) -> filter(|&n| n < TickInstant::new(10)) -> defer_tick() -> union_tee; union_tee -> for_each(|v| out_send.send(v).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!( &[ TickInstant::new(0), TickInstant::new(1), TickInstant::new(2), TickInstant::new(3), TickInstant::new(4), TickInstant::new(5), TickInstant::new(6), TickInstant::new(7), TickInstant::new(8), TickInstant::new(9) ], &*hydroflow::util::collect_ready::, _>(&mut out_recv) ); assert_eq!( (TickInstant::new(10), 0), (df.current_tick(), df.current_stratum()) ); } #[multiplatform_test(hydroflow, env_tracing)] async fn test_persist_stratum_run_available() -> Result<(), Box> { let (out_send, out_recv) = hydroflow::util::unbounded_channel(); let mut df = hydroflow_syntax! { a = source_iter([0]) -> persist::<'static>() -> next_stratum() -> for_each(|x| out_send.send(x).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); let seen: Vec<_> = hydroflow::util::collect_ready_async(out_recv).await; rassert_eq!( &[0], &*seen, "Only one tick should have run, actually ran {}", seen.len() )?; Ok(()) } #[multiplatform_test(hydroflow, env_tracing)] async fn test_persist_stratum_run_async() -> Result<(), Box> { let (out_send, out_recv) = hydroflow::util::unbounded_channel(); let mut df = hydroflow_syntax! { source_iter([0]) -> persist::<'static>() -> next_stratum() -> for_each(|x| out_send.send(x).unwrap()); }; assert_graphvis_snapshots!(df); timeout(Duration::from_millis(200), df.run_async()) .await .expect_err("Expected time out"); let seen: Vec<_> = hydroflow::util::collect_ready_async(out_recv).await; rassert_eq!( &[0], &*seen, "Only one tick should have run, actually ran {}", seen.len() )?; Ok(()) } #[multiplatform_test(test, wasm, env_tracing)] pub fn test_issue_800_1050_persist() { let mut df = hydroflow_syntax! { in1 = source_iter(0..10) -> map(|i| (i, i)); in1 -> persist::<'static>() -> my_union_tee; my_union_tee = union() -> tee(); my_union_tee -> filter(|_| false) -> my_union_tee; my_union_tee -> for_each(|x| println!("A {} {} {:?}", context.current_tick(), context.current_stratum(), x)); }; assert_graphvis_snapshots!(df); df.run_available(); } #[multiplatform_test(test, wasm, env_tracing)] pub fn test_issue_800_1050_fold_keyed() { let mut df = hydroflow_syntax! { in1 = source_iter(0..10) -> map(|i| (i, i)); in1 -> fold_keyed::<'static>(Vec::new, Vec::push) -> my_union_tee; my_union_tee = union() -> tee(); my_union_tee -> filter(|_| false) -> my_union_tee; my_union_tee -> for_each(|x| println!("A {} {} {:?}", context.current_tick(), context.current_stratum(), x)); }; assert_graphvis_snapshots!(df); df.run_available(); } #[multiplatform_test(test, wasm, env_tracing)] pub fn test_issue_800_1050_reduce_keyed() { let mut df = hydroflow_syntax! { in1 = source_iter(0..10) -> map(|i| (i, i)); in1 -> reduce_keyed::<'static>(std::ops::AddAssign::add_assign) -> my_union_tee; my_union_tee = union() -> tee(); my_union_tee -> filter(|_| false) -> my_union_tee; my_union_tee -> for_each(|x| println!("A {} {} {:?}", context.current_tick(), context.current_stratum(), x)); }; assert_graphvis_snapshots!(df); df.run_available(); } #[multiplatform_test(hydroflow, env_tracing)] async fn test_nospin_issue_961() { let mut df = hydroflow_syntax! { source_iter([1]) -> next_stratum() -> persist::<'static>() -> defer_tick_lazy() -> null(); }; assert_graphvis_snapshots!(df); timeout(Duration::from_millis(100), df.run_available_async()) .await .expect("Should not spin."); } #[multiplatform_test(hydroflow, env_tracing)] async fn test_nospin_issue_961_complicated() { let mut df = hydroflow_syntax! { source_iter([1]) -> items; items = union(); double = items -> persist::<'static>() -> fold(|| 0, |accum, x| *accum += x) -> defer_tick_lazy() -> filter(|_| false) -> tee(); double -> null(); double -> items; }; assert_graphvis_snapshots!(df); timeout(Duration::from_millis(100), df.run_available_async()) .await .expect("Should not spin."); }