use hydroflow::util::collect_ready; use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax}; use multiplatform_test::multiplatform_test; #[multiplatform_test(test, wasm, env_tracing)] pub fn test_basic() { let (single_tx, single_rx) = hydroflow::util::unbounded_channel::<()>(); let (egress_tx, mut egress_rx) = hydroflow::util::unbounded_channel(); let mut df = hydroflow_syntax! { join = cross_singleton(); source_iter([1, 2, 3]) -> persist::<'static>() -> [input]join; source_stream(single_rx) -> [single]join; join -> for_each(|x| egress_tx.send(x).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); let out: Vec<_> = collect_ready(&mut egress_rx); assert_eq!(out, []); single_tx.send(()).unwrap(); df.run_available(); let out: Vec<_> = collect_ready(&mut egress_rx); assert_eq!(out, vec![(1, ()), (2, ()), (3, ())]); } #[multiplatform_test(test, wasm, env_tracing)] pub fn test_union_defer_tick() { let (cross_tx, cross_rx) = hydroflow::util::unbounded_channel::(); let (egress_tx, mut egress_rx) = hydroflow::util::unbounded_channel(); let mut df = hydroflow_syntax! { teed_in = source_stream(cross_rx) -> sort() -> tee(); teed_in -> [input]join; deferred_stream -> defer_tick_lazy() -> [0]unioned_stream; persisted_stream = source_iter([0]) -> persist::<'static>(); persisted_stream -> [1]unioned_stream; unioned_stream = union(); unioned_stream -> [single]join; join = cross_singleton() -> tee(); join -> for_each(|x| egress_tx.send(x).unwrap()); folded_thing = join -> fold(|| 0, |_, _| {}); teed_in -> [input]joined_folded; folded_thing -> [single]joined_folded; joined_folded = cross_singleton(); deferred_stream = joined_folded -> fold(|| 0, |_, _| {}) -> flat_map(|_| []); }; assert_graphvis_snapshots!(df); df.run_available(); let out: Vec<_> = collect_ready(&mut egress_rx); assert_eq!(out, vec![]); cross_tx.send(1).unwrap(); cross_tx.send(2).unwrap(); cross_tx.send(3).unwrap(); df.run_available(); let out: Vec<_> = collect_ready(&mut egress_rx); assert_eq!(out, vec![(1, 0), (2, 0), (3, 0)]); }