use std::collections::HashSet; use hydroflow::compiled::pull::HalfMultisetJoinState; use hydroflow::scheduled::ticks::TickInstant; use hydroflow::util::collect_ready; use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax}; use multiplatform_test::multiplatform_test; #[multiplatform_test] pub fn test_persist_basic() { let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); let mut hf = hydroflow_syntax! { source_iter([1]) -> persist::<'static>() -> persist::<'static>() -> fold(|| 0, |a: &mut _, b| *a += b) -> for_each(|x| result_send.send(x).unwrap()); }; assert_graphvis_snapshots!(hf); for tick in 0..10 { assert_eq!(TickInstant::new(tick), hf.current_tick()); hf.run_tick(); } assert_eq!( &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &*collect_ready::, _>(&mut result_recv) ); } #[multiplatform_test] pub fn test_persist_pull() { let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); let mut hf = hydroflow_syntax! { // Structured to ensure `persist::<'static>()` is pull-based. source_iter([1]) -> persist::<'static>() -> m0; null() -> m0; m0 = union() -> persist::<'static>() -> m1; null() -> m1; m1 = union() -> fold(|| 0, |a: &mut _, b| *a += b) -> for_each(|x| result_send.send(x).unwrap()); }; assert_graphvis_snapshots!(hf); for tick in 0..10 { assert_eq!(TickInstant::new(tick), hf.current_tick()); hf.run_tick(); } assert_eq!( &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &*collect_ready::, _>(&mut result_recv) ); } #[multiplatform_test] pub fn test_persist_push() { let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); let mut hf = hydroflow_syntax! { t0 = source_iter([1]) -> persist::<'static>() -> tee(); t0 -> null(); t1 = t0 -> persist::<'static>() -> tee(); t1 -> null(); t1 -> fold(|| 0, |a: &mut _, b| *a += b) -> for_each(|x| result_send.send(x).unwrap()); }; assert_graphvis_snapshots!(hf); for tick in 0..10 { assert_eq!(TickInstant::new(tick), hf.current_tick()); hf.run_tick(); } assert_eq!( &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &*collect_ready::, _>(&mut result_recv) ); } #[multiplatform_test] pub fn test_persist_join() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(&str, &str)>(); let mut flow = hydroflow::hydroflow_syntax! { source_iter([("hello", "world")]) -> persist::<'static>() -> [0]my_join; source_stream(input_recv) -> persist::<'static>() -> [1]my_join; my_join = join::<'tick>() -> for_each(|(k, (v1, v2))| println!("({}, ({}, {}))", k, v1, v2)); }; input_send.send(("hello", "oakland")).unwrap(); flow.run_tick(); input_send.send(("hello", "san francisco")).unwrap(); flow.run_tick(); } #[multiplatform_test] pub fn test_persist_replay_join() { let (persist_input_send, persist_input) = hydroflow::util::unbounded_channel::(); let (other_input_send, other_input) = hydroflow::util::unbounded_channel::(); let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<(u32, u32)>(); let mut hf = hydroflow_syntax! { source_stream(persist_input) -> persist::<'static>() -> fold::<'tick>(|| 0, |a: &mut _, b| *a += b) -> next_stratum() -> [0]product_node; source_stream(other_input) -> [1] product_node; product_node = cross_join::<'tick, 'tick>() -> for_each(|x| result_send.send(x).unwrap()); }; assert_graphvis_snapshots!(hf); persist_input_send.send(1).unwrap(); other_input_send.send(2).unwrap(); hf.run_tick(); assert_eq!(&[(1, 2)], &*collect_ready::, _>(&mut result_recv)); persist_input_send.send(2).unwrap(); other_input_send.send(2).unwrap(); hf.run_tick(); assert_eq!(&[(3, 2)], &*collect_ready::, _>(&mut result_recv)); other_input_send.send(3).unwrap(); hf.run_tick(); assert_eq!(&[(3, 3)], &*collect_ready::, _>(&mut result_recv)); } #[multiplatform_test] pub fn test_persist_double_handoff() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); let (input_2_send, input_2_recv) = hydroflow::util::unbounded_channel::(); let (output_send, mut output_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { teed_first_sg = source_stream(input_2_recv) -> tee(); teed_first_sg -> [0] joined_second_sg; teed_first_sg -> [1] joined_second_sg; source_stream(input_recv) -> persist::<'static>() -> inspect(|x| println!("LHS {} {}:{}", x, context.current_tick(), context.current_stratum())) -> [0] cross; joined_second_sg = cross_join::<'tick, 'tick>() -> map(|t| t.0) -> inspect(|x| println!("RHS {} {}:{}", x, context.current_tick(), context.current_stratum())) -> [1] cross; cross = cross_join::<'tick, 'tick, HalfMultisetJoinState>() -> for_each(|x| output_send.send(x).unwrap()); }; println!("A {}:{}", flow.current_tick(), flow.current_stratum()); input_send.send(0).unwrap(); flow.run_tick(); println!("B {}:{}", flow.current_tick(), flow.current_stratum()); assert!(collect_ready::, _>(&mut output_recv).is_empty()); input_2_send.send(1).unwrap(); flow.run_tick(); println!("C {}:{}", flow.current_tick(), flow.current_stratum()); assert_eq!(&[(0, 1)], &*collect_ready::, _>(&mut output_recv)); } #[multiplatform_test] pub fn test_persist_single_handoff() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); let (input_2_send, input_2_recv) = hydroflow::util::unbounded_channel::(); let (output_send, mut output_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { teed_first_sg = source_stream(input_2_recv) -> tee(); teed_first_sg [0] -> null(); teed_first_sg [1] -> joined_second_sg; null() -> joined_second_sg; source_stream(input_recv) -> persist::<'static>() -> inspect(|x| println!("LHS {} {}:{}", x, context.current_tick(), context.current_stratum())) -> [0] cross; joined_second_sg = union() -> inspect(|x| println!("RHS {} {}:{}", x, context.current_tick(), context.current_stratum())) -> [1] cross; cross = cross_join::<'tick, 'tick, HalfMultisetJoinState>() -> for_each(|x| output_send.send(x).unwrap()); }; println!("A {}:{}", flow.current_tick(), flow.current_stratum()); input_send.send(0).unwrap(); flow.run_tick(); println!("B {}:{}", flow.current_tick(), flow.current_stratum()); assert!(collect_ready::, _>(&mut output_recv).is_empty()); input_2_send.send(1).unwrap(); flow.run_tick(); println!("C {}:{}", flow.current_tick(), flow.current_stratum()); assert_eq!(&[(0, 1)], &*collect_ready::, _>(&mut output_recv)); } #[multiplatform_test] pub fn test_persist_single_subgraph() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); let (input_2_send, input_2_recv) = hydroflow::util::unbounded_channel::(); let (output_send, mut output_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_2_recv) -> joined_second_sg; source_stream(input_recv) -> persist::<'static>() -> inspect(|x| println!("LHS {} {}:{}", x, context.current_tick(), context.current_stratum())) -> [0] cross; joined_second_sg = inspect(|x| println!("RHS {} {}:{}", x, context.current_tick(), context.current_stratum())) -> [1] cross; cross = cross_join::<'tick, 'tick, HalfMultisetJoinState>() -> for_each(|x| output_send.send(x).unwrap()); }; println!("A {}:{}", flow.current_tick(), flow.current_stratum()); input_send.send(0).unwrap(); flow.run_tick(); println!("B {}:{}", flow.current_tick(), flow.current_stratum()); assert!(collect_ready::, _>(&mut output_recv).is_empty()); input_2_send.send(1).unwrap(); flow.run_tick(); println!("C {}:{}", flow.current_tick(), flow.current_stratum()); assert_eq!(&[(0, 1)], &*collect_ready::, _>(&mut output_recv)); } #[multiplatform_test] pub fn test_persist() { let (pull_tx, mut pull_rx) = hydroflow::util::unbounded_channel::(); let (push_tx, mut push_rx) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { my_tee = source_iter([1, 2, 3]) -> persist::<'static>() // pull -> tee(); my_tee -> for_each(|v| pull_tx.send(v).unwrap()); my_tee -> persist::<'static>() // push -> for_each(|v| push_tx.send(v).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!(&[1, 2, 3], &*collect_ready::, _>(&mut pull_rx)); assert_eq!(&[1, 2, 3], &*collect_ready::, _>(&mut push_rx)); } #[multiplatform_test] pub fn test_persist_mut() { use hydroflow::util::Persistence::*; let (pull_tx, mut pull_rx) = hydroflow::util::unbounded_channel::(); let (push_tx, mut push_rx) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { my_tee = source_iter([Persist(1), Persist(2), Persist(3), Persist(4), Delete(2)]) -> persist_mut::<'static>() // pull -> tee(); my_tee -> for_each(|v| pull_tx.send(v).unwrap()); my_tee -> flat_map(|x| if x == 3 {vec![Persist(x), Delete(x)]} else {vec![Persist(x)]}) -> persist_mut::<'static>() // push -> for_each(|v| push_tx.send(v).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!(&[1, 3, 4], &*collect_ready::, _>(&mut pull_rx)); assert_eq!(&[1, 4], &*collect_ready::, _>(&mut push_rx)); } #[multiplatform_test] pub fn test_persist_mut_keyed() { use hydroflow::util::PersistenceKeyed::*; let (pull_tx, mut pull_rx) = hydroflow::util::unbounded_channel::(); let (push_tx, mut push_rx) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { my_tee = source_iter([Persist(1, 1), Persist(2, 2), Persist(3, 3), Persist(4, 4), Delete(2)]) -> persist_mut_keyed::<'static>() // pull -> tee(); my_tee -> for_each(|(_k, v)| pull_tx.send(v).unwrap()); my_tee -> flat_map(|(k, v)| if v == 3 {vec![Persist(k, v), Delete(k)]} else {vec![Persist(k, v)]}) -> persist_mut_keyed::<'static>() // push -> for_each(|(_k, v)| push_tx.send(v).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!( HashSet::from_iter([1, 3, 4]), collect_ready::, _>(&mut pull_rx) ); assert_eq!( HashSet::from_iter([1, 4]), collect_ready::, _>(&mut push_rx) ); }