use std::collections::HashSet; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::ticks::TickInstant; use hydroflow::util::collect_ready; use hydroflow::util::multiset::HashMultiSet; use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax}; use multiplatform_test::multiplatform_test; // TODO(mingwei): custom operators? How to handle in syntax? How to handle state? // TODO(mingwei): Still need to handle crossing stratum boundaries // TODO(mingwei): Implement non-monotonicity handling. // TODO(mingwei): Tiemo user test after Tuesday. // TODO(mingwei): Try to get more bad error messages to appear. // TODO(joe): QOL: make a way to generate/print the mermaid graph. // TODO(mingwei): Prevent unused variable warnings when hydroflow code is not generated. // Joe: // TODO(mingwei): Documentation articles. // TODO(mingwei): Find a way to display join keys #[multiplatform_test] pub fn test_basic_2() { let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { source_iter([1]) -> for_each(|v| out_send.send(v).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!(&[1], &*collect_ready::, _>(&mut out_recv)); } #[multiplatform_test] pub fn test_basic_3() { let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { source_iter([1]) -> map(|v| v + 1) -> for_each(|v| out_send.send(v).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!(&[2], &*collect_ready::, _>(&mut out_recv)); } #[multiplatform_test] pub fn test_basic_union() { let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { m = union() -> for_each(|v| out_send.send(v).unwrap()); source_iter([1]) -> [0]m; source_iter([2]) -> [1]m; }; assert_graphvis_snapshots!(df); df.run_available(); assert_eq!(&[1, 2], &*collect_ready::, _>(&mut out_recv)); } #[multiplatform_test] pub fn test_basic_tee() { let (out_send_a, mut out_recv) = hydroflow::util::unbounded_channel::(); let out_send_b = out_send_a.clone(); let mut df = hydroflow_syntax! { t = source_iter([1]) -> tee(); t[0] -> for_each(|v| out_send_a.send(format!("A {}", v)).unwrap()); t[1] -> for_each(|v| out_send_b.send(format!("B {}", v)).unwrap()); }; df.run_available(); let out: HashSet<_> = collect_ready(&mut out_recv); assert_eq!(2, out.len()); assert!(out.contains("A 1")); assert!(out.contains("B 1")); } #[multiplatform_test] pub fn test_basic_inspect_null() { use std::cell::RefCell; use std::rc::Rc; let seen = Rc::new(RefCell::new(Vec::new())); let seen_inner = Rc::clone(&seen); let mut df = hydroflow_syntax! { source_iter([1, 2, 3, 4]) -> inspect(|&x| seen_inner.borrow_mut().push(x)) -> null(); }; df.run_available(); assert_eq!(&[1, 2, 3, 4], &**seen.borrow()); } #[multiplatform_test] pub fn test_basic_inspect_no_null() { use std::cell::RefCell; use std::rc::Rc; let seen = Rc::new(RefCell::new(Vec::new())); let seen_inner = Rc::clone(&seen); let mut df = hydroflow_syntax! { source_iter([1, 2, 3, 4]) -> inspect(|&x| seen_inner.borrow_mut().push(x)); }; df.run_available(); assert_eq!(&[1, 2, 3, 4], &**seen.borrow()); } // Mainly checking subgraph partitioning pull-push handling. #[multiplatform_test] pub fn test_large_diamond() { #[expect(clippy::map_identity, reason = "testing topology")] let mut df: Hydroflow = hydroflow_syntax! { t = source_iter([1]) -> tee(); j = union() -> for_each(|x| println!("{}", x)); t[0] -> map(std::convert::identity) -> map(std::convert::identity) -> [0]j; t[1] -> map(std::convert::identity) -> map(std::convert::identity) -> [1]j; }; df.run_available(); } /// Test that source_stream can handle "complex" expressions. #[multiplatform_test] pub fn test_recv_expr() { let send_recv = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { source_stream(send_recv.1) -> for_each(|v| print!("{:?}", v)); }; assert_graphvis_snapshots!(df); df.run_available(); let items_send = send_recv.0; items_send.send(9).unwrap(); items_send.send(2).unwrap(); items_send.send(5).unwrap(); df.run_available(); } #[multiplatform_test] pub fn test_join_order() { let _df_good = hydroflow_syntax! { yikes = join() -> for_each(|m: ((), (u32, String))| println!("{:?}", m)); source_iter([0,1,2]) -> map(|i| ((), i)) -> [0]yikes; source_iter(["a".to_string(),"b".to_string(),"c".to_string()]) -> map(|s| ((), s)) -> [1]yikes; }; let _df_bad = hydroflow_syntax! { yikes = join() -> for_each(|m: ((), (u32, String))| println!("{:?}", m)); source_iter(["a".to_string(),"b".to_string(),"c".to_string()]) -> map(|s| ((), s)) -> [1]yikes; source_iter([0,1,2]) -> map(|i| ((), i)) -> [0]yikes; }; } #[multiplatform_test] pub fn test_multiset_join() { // HalfJoinStateSetUnion { use hydroflow::compiled::pull::HalfSetJoinState; let (out_tx, mut out_rx) = hydroflow::util::unbounded_channel::<(usize, (usize, usize))>(); let mut df = hydroflow_syntax! { my_join = join::() -> for_each(|m| out_tx.send(m).unwrap()); source_iter([(0, 1), (0, 1)]) -> [0]my_join; source_iter([(0, 2)]) -> [1]my_join; }; df.run_available(); let out: Vec<_> = collect_ready(&mut out_rx); assert_eq!(out, vec![(0, (1, 2))]); } // HalfMultisetJoinState lhs biased { use hydroflow::compiled::pull::HalfMultisetJoinState; let (out_tx, mut out_rx) = hydroflow::util::unbounded_channel::<(usize, (usize, usize))>(); let mut df = hydroflow_syntax! { my_join = join::() -> for_each(|m| out_tx.send(m).unwrap()); source_iter([(1, 1), (1, 1), (1, 1)]) -> [0]my_join; source_iter([(1, 2), (1, 2), (1, 2), (1, 2)]) -> [1]my_join; }; df.run_available(); let out: Vec<_> = collect_ready(&mut out_rx); assert_eq!(out, [(1, (1, 2)); 12].to_vec()); } // HalfMultisetJoinState rhs biased { use hydroflow::compiled::pull::HalfMultisetJoinState; let (out_tx, mut out_rx) = hydroflow::util::unbounded_channel::<(usize, (usize, usize))>(); let mut df = hydroflow_syntax! { my_join = join::() -> for_each(|m| out_tx.send(m).unwrap()); source_iter([(1, 1), (1, 1), (1, 1), (1, 1)]) -> [0]my_join; source_iter([(1, 2), (1, 2), (1, 2)]) -> [1]my_join; }; df.run_available(); let out: Vec<_> = collect_ready(&mut out_rx); assert_eq!(out, [(1, (1, 2)); 12].to_vec()); } } #[multiplatform_test] pub fn test_cross_join() { let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, &str)>(); let mut df = hydroflow_syntax! { cj = cross_join() -> for_each(|v| out_send.send(v).unwrap()); source_iter([1, 2, 2, 3]) -> [0]cj; source_iter(["a", "b", "c", "c"]) -> [1]cj; }; df.run_available(); let mut out: Vec<_> = collect_ready(&mut out_recv); out.sort(); assert_eq!( out, [ (1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"), (2, "c"), (3, "a"), (3, "b"), (3, "c") ] ); } #[multiplatform_test] pub fn test_cross_join_multiset() { let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, &str)>(); let mut df = hydroflow_syntax! { cj = cross_join_multiset() -> for_each(|v| out_send.send(v).unwrap()); source_iter([1, 2, 2, 3]) -> [0]cj; source_iter(["a", "b", "c", "c"]) -> [1]cj; }; df.run_available(); let mut out: Vec<_> = collect_ready(&mut out_recv); out.sort(); assert_eq!( out, [ (1, "a"), (1, "b"), (1, "c"), (1, "c"), (2, "a"), (2, "a"), (2, "b"), (2, "b"), (2, "c"), (2, "c"), (2, "c"), (2, "c"), (3, "a"), (3, "b"), (3, "c"), (3, "c"), ] ); } #[multiplatform_test] pub fn test_defer_tick() { let (inp_send, inp_recv) = hydroflow::util::unbounded_channel::(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut flow = hydroflow::hydroflow_syntax! { inp = source_stream(inp_recv) -> tee(); diff = difference() -> for_each(|x| out_send.send(x).unwrap()); inp -> [pos]diff; inp -> defer_tick() -> [neg]diff; }; for x in [1, 2, 3, 4] { inp_send.send(x).unwrap(); } flow.run_tick(); for x in [3, 4, 5, 6] { inp_send.send(x).unwrap(); } flow.run_tick(); flow.run_available(); assert_eq!( HashMultiSet::from_iter([1, 2, 3, 4, 5, 6]), collect_ready(&mut out_recv) ); } #[multiplatform_test] pub fn test_anti_join() { let (inp_send, inp_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { inp = source_stream(inp_recv) -> tee(); diff = anti_join() -> sort() -> for_each(|x| out_send.send(x).unwrap()); inp -> [pos]diff; inp -> defer_tick() -> map(|x: (usize, usize)| x.0) -> [neg]diff; }; for x in [(1, 2), (1, 2), (2, 3), (3, 4), (4, 5)] { inp_send.send(x).unwrap(); } flow.run_tick(); for x in [(3, 2), (4, 3), (5, 4), (6, 5)] { inp_send.send(x).unwrap(); } flow.run_tick(); flow.run_available(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!( &[(1, 2), (1, 2), (2, 3), (3, 4), (4, 5), (5, 4), (6, 5)], &*out ); } #[multiplatform_test] pub fn test_anti_join_static() { let (pos_send, pos_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (neg_send, neg_recv) = hydroflow::util::unbounded_channel::(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { pos = source_stream(pos_recv); neg = source_stream(neg_recv); pos -> [pos]diff_static; neg -> [neg]diff_static; diff_static = anti_join::<'static>() -> sort() -> for_each(|x| out_send.send(x).unwrap()); }; for x in [(1, 2), (1, 2), (200, 3), (300, 4), (400, 5), (5, 6)] { pos_send.send(x).unwrap(); } for x in [200, 300] { neg_send.send(x).unwrap(); } flow.run_tick(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(1, 2), (1, 2), (5, 6), (400, 5)], &*out); neg_send.send(400).unwrap(); flow.run_available(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(1, 2), (5, 6)], &*out); } #[multiplatform_test] pub fn test_anti_join_tick_static() { let (pos_send, pos_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (neg_send, neg_recv) = hydroflow::util::unbounded_channel::(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { pos = source_stream(pos_recv); neg = source_stream(neg_recv); pos -> [pos]diff_static; neg -> [neg]diff_static; diff_static = anti_join::<'tick, 'static>() -> sort() -> for_each(|x| out_send.send(x).unwrap()); }; for x in [(1, 2), (1, 2), (200, 3), (300, 4), (400, 5), (5, 6)] { pos_send.send(x).unwrap(); } for x in [200, 300] { neg_send.send(x).unwrap(); } flow.run_tick(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(1, 2), (1, 2), (5, 6), (400, 5)], &*out); for x in [(10, 10), (10, 10), (200, 5)] { pos_send.send(x).unwrap(); } flow.run_available(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(10, 10), (10, 10)], &*out); } #[multiplatform_test] pub fn test_anti_join_multiset_tick_static() { let (pos_send, pos_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (neg_send, neg_recv) = hydroflow::util::unbounded_channel::(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { pos = source_stream(pos_recv); neg = source_stream(neg_recv); pos -> [pos]diff_static; neg -> [neg]diff_static; diff_static = anti_join_multiset::<'tick, 'static>() -> sort() -> for_each(|x| out_send.send(x).unwrap()); }; for x in [(1, 2), (1, 2), (200, 3), (300, 4), (400, 5), (5, 6)] { pos_send.send(x).unwrap(); } for x in [200, 300] { neg_send.send(x).unwrap(); } flow.run_tick(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(1, 2), (1, 2), (5, 6), (400, 5),], &*out); for x in [(10, 10), (10, 10), (200, 5)] { pos_send.send(x).unwrap(); } flow.run_available(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(10, 10), (10, 10)], &*out); } #[multiplatform_test] pub fn test_anti_join_multiset_static() { let (pos_send, pos_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (neg_send, neg_recv) = hydroflow::util::unbounded_channel::(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { pos = source_stream(pos_recv); neg = source_stream(neg_recv); pos -> [pos]diff_static; neg -> [neg]diff_static; diff_static = anti_join_multiset::<'static>() -> sort() -> for_each(|x| out_send.send(x).unwrap()); }; for x in [(1, 2), (1, 2), (200, 3), (300, 4), (400, 5), (5, 6)] { pos_send.send(x).unwrap(); } for x in [200, 300] { neg_send.send(x).unwrap(); } flow.run_tick(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(1, 2), (1, 2), (5, 6), (400, 5)], &*out); neg_send.send(400).unwrap(); flow.run_available(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!(&[(1, 2), (1, 2), (5, 6)], &*out); } #[multiplatform_test] pub fn test_anti_join_multiset() { let (inp_send, inp_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow::hydroflow_syntax! { inp = source_stream(inp_recv) -> tee(); diff = anti_join_multiset() -> sort() -> for_each(|x| out_send.send(x).unwrap()); inp -> [pos]diff; inp -> defer_tick() -> map(|x: (usize, usize)| x.0) -> [neg]diff; }; for x in [(1, 2), (1, 2), (2, 3), (3, 4), (4, 5)] { inp_send.send(x).unwrap(); } flow.run_tick(); for x in [(3, 2), (4, 3), (5, 4), (6, 5)] { inp_send.send(x).unwrap(); } flow.run_tick(); flow.run_available(); let out: Vec<_> = collect_ready(&mut out_recv); assert_eq!( &[(1, 2), (1, 2), (2, 3), (3, 4), (4, 5), (5, 4), (6, 5)], &*out ); } #[multiplatform_test] pub fn test_sort() { let (items_send, items_recv) = hydroflow::util::unbounded_channel::(); let mut df = hydroflow_syntax! { source_stream(items_recv) -> sort() -> for_each(|v| print!("{:?}, ", v)); }; assert_graphvis_snapshots!(df); df.run_available(); print!("\nA: "); items_send.send(9).unwrap(); items_send.send(2).unwrap(); items_send.send(5).unwrap(); df.run_available(); print!("\nB: "); items_send.send(9).unwrap(); items_send.send(5).unwrap(); items_send.send(2).unwrap(); items_send.send(0).unwrap(); items_send.send(3).unwrap(); df.run_available(); println!(); } #[multiplatform_test] pub fn test_sort_by_key() { let mut df = hydroflow_syntax! { source_iter(vec!((2, 'y'), (3, 'x'), (1, 'z'))) -> sort_by_key(|(k, _v)| k) -> for_each(|v| println!("{:?}", v)); }; assert_graphvis_snapshots!(df); df.run_available(); println!(); } #[multiplatform_test] fn test_sort_by_owned() { #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] struct Dummy { x: String, y: i8, } let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let dummies: Vec = vec![ Dummy { x: "a".to_string(), y: 2, }, Dummy { x: "b".to_string(), y: 1, }, ]; let mut dummies_saved = dummies.clone(); let mut df = hydroflow_syntax! { source_iter(dummies) -> sort_by_key(|d| &d.x) -> for_each(|d| out_send.send(d).unwrap()); }; df.run_available(); let results = collect_ready::, _>(&mut out_recv); dummies_saved.sort_unstable_by(|d1, d2| d1.y.cmp(&d2.y)); assert_ne!(&dummies_saved, &*results); dummies_saved.sort_unstable_by(|d1, d2| d1.x.cmp(&d2.x)); assert_eq!(&dummies_saved, &*results); } #[multiplatform_test] pub fn test_channel_minimal() { let (send, recv) = hydroflow::util::unbounded_channel::(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut df1 = hydroflow_syntax! { source_iter([1, 2, 3]) -> for_each(|x| { send.send(x).unwrap(); }); }; let mut df2 = hydroflow_syntax! { source_stream(recv) -> for_each(|x| out_send.send(x).unwrap()); }; df2.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([] as [usize; 0], *results); df1.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([] as [usize; 0], *results); df2.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([1, 2, 3], *results); } #[multiplatform_test] pub fn test_surface_syntax_reachability_generated() { // An edge in the input data = a pair of `usize` vertex IDs. let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::(); let mut df: Hydroflow = hydroflow_syntax! { reached_vertices = union() -> map(|v| (v, ())); source_iter(vec![0]) -> [0]reached_vertices; my_join_tee = join::<'static>() -> map(|(_src, ((), dst))| dst) -> unique::<'static>() -> tee(); reached_vertices -> [0]my_join_tee; source_stream(pairs_recv) -> [1]my_join_tee; my_join_tee[0] -> [1]reached_vertices; my_join_tee[1] -> for_each(|x| out_send.send(x).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([] as [usize; 0], *results); pairs_send.send((0, 1)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([1], *results); pairs_send.send((2, 4)).unwrap(); pairs_send.send((3, 4)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([] as [usize; 0], *results); pairs_send.send((1, 2)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([2, 4], *results); pairs_send.send((0, 3)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([3], *results); pairs_send.send((0, 3)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([] as [usize; 0], *results); } #[multiplatform_test] pub fn test_transitive_closure() { // An edge in the input data = a pair of `usize` vertex IDs. let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (out_send, mut out_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut df = hydroflow_syntax! { // edge(x,y) :- link(x,y) edge_union_tee = union() -> tee(); link_tee = tee(); source_stream(pairs_recv) -> link_tee; link_tee[0] -> [0]edge_union_tee; // edge(a,b) :- edge(a,k), link(k,b) the_join = join::<'static>() -> unique::<'static>(); edge_union_tee[0] -> map(|(a, k)| (k, a)) -> [0]the_join; link_tee[1] -> [1]the_join; the_join -> map(|(_k, (a, b))| (a, b)) -> [1]edge_union_tee; edge_union_tee[1] -> for_each(|(a, b)| out_send.send((a, b)).unwrap()); }; assert_graphvis_snapshots!(df); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([] as [(usize, usize); 0], *results); pairs_send.send((0, 1)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([(0, 1)], *results); pairs_send.send((2, 4)).unwrap(); pairs_send.send((3, 4)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([(2, 4), (3, 4)], *results); pairs_send.send((1, 2)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([(1, 2), (0, 2), (1, 4), (0, 4)], *results); pairs_send.send((0, 3)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([(0, 3), (0, 4)], *results); pairs_send.send((0, 3)).unwrap(); df.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([(0, 3)], *results); } #[multiplatform_test] pub fn test_covid_tracing() { use hydroflow::util::unbounded_channel; const TRANSMISSIBLE_DURATION: usize = 14; // Days. type Pid = usize; type Name = &'static str; type Phone = &'static str; type DateTime = usize; // Days. let (contacts_send, contacts_recv) = unbounded_channel::<(Pid, Pid, DateTime)>(); let (diagnosed_send, diagnosed_recv) = unbounded_channel::<(Pid, (DateTime, DateTime))>(); let (people_send, people_recv) = unbounded_channel::<(Pid, (Name, Phone))>(); let (out_send, mut out_recv) = unbounded_channel::(); let mut hydroflow = hydroflow_syntax! { contacts = source_stream(contacts_recv) -> flat_map(|(pid_a, pid_b, time)| [(pid_a, (pid_b, time)), (pid_b, (pid_a, time))]); exposed = union(); source_stream(diagnosed_recv) -> [0]exposed; new_exposed = join::<'static>() -> filter(|(_pid_a, ((_pid_b, t_contact), (t_from, t_to)))| { (t_from..=t_to).contains(&t_contact) }) -> map(|(_pid_a, (pid_b_t_contact, _t_from_to))| pid_b_t_contact) -> tee(); contacts -> [0]new_exposed; exposed -> [1]new_exposed; new_exposed[0] -> map(|(pid, t)| (pid, (t, t + TRANSMISSIBLE_DURATION))) -> [1]exposed; notifs = join::<'static>() -> map(|(_pid, ((name, phone), exposure))| { format!( "[{}] To {}: Possible Exposure at t = {}", name, phone, exposure, ) }) -> tee(); notifs -> for_each(|msg| println!("{}", msg)); notifs -> for_each(|msg| out_send.send(msg).unwrap()); source_stream(people_recv) -> [0]notifs; new_exposed[1] -> [1]notifs; }; assert_graphvis_snapshots!(hydroflow); { people_send .send((101, ("Mingwei S", "+1 650 555 7283"))) .unwrap(); people_send .send((102, ("Justin J", "+1 519 555 3458"))) .unwrap(); people_send .send((103, ("Mae M", "+1 912 555 9129"))) .unwrap(); contacts_send.send((101, 102, 1031)).unwrap(); // Mingwei + Justin contacts_send.send((101, 201, 1027)).unwrap(); // Mingwei + Joe let mae_diag_datetime = 1022; diagnosed_send .send(( 103, // Mae ( mae_diag_datetime, mae_diag_datetime + TRANSMISSIBLE_DURATION, ), )) .unwrap(); hydroflow.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!([] as [String; 0], *results); println!("A"); contacts_send .send((101, 103, mae_diag_datetime + 6)) .unwrap(); // Mingwei + Mae hydroflow.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!( [ "[Mingwei S] To +1 650 555 7283: Possible Exposure at t = 1028", "[Justin J] To +1 519 555 3458: Possible Exposure at t = 1031", "[Mae M] To +1 912 555 9129: Possible Exposure at t = 1028", "[Mingwei S] To +1 650 555 7283: Possible Exposure at t = 1031", ], *results ); println!("B"); people_send .send((103, ("Joe H", "+1 510 555 9999"))) .unwrap(); hydroflow.run_available(); let results = collect_ready::, _>(&mut out_recv); assert_eq!( [ "[Mingwei S] To +1 650 555 7283: Possible Exposure at t = 1028", "[Mingwei S] To +1 650 555 7283: Possible Exposure at t = 1031", "[Justin J] To +1 519 555 3458: Possible Exposure at t = 1031", "[Mae M] To +1 912 555 9129: Possible Exposure at t = 1028", "[Joe H] To +1 510 555 9999: Possible Exposure at t = 1028" ], *results ); } } #[multiplatform_test] pub fn test_assert_eq() { let mut df = hydroflow_syntax! { source_iter([1, 2, 3]) -> assert_eq([1, 2, 3]) -> assert_eq([1, 2, 3]); // one in pull, one in push source_iter([1, 2, 3]) -> assert_eq([1, 2, 3]) -> assert_eq(vec![1, 2, 3]); source_iter([1, 2, 3]) -> assert_eq(vec![1, 2, 3]) -> assert_eq([1, 2, 3]); source_iter(vec![1, 2, 3]) -> assert_eq([1, 2, 3]) -> assert_eq([1, 2, 3]); }; df.run_available(); } #[multiplatform_test(test)] pub fn test_assert_failures() { assert!(std::panic::catch_unwind(|| { let mut df = hydroflow_syntax! { source_iter([0]) -> assert_eq([1]); }; df.run_available(); }) .is_err()); assert!(std::panic::catch_unwind(|| { let mut df = hydroflow_syntax! { source_iter([0]) -> assert_eq([1]) -> null(); }; df.run_available(); }) .is_err()); } #[multiplatform_test] pub fn test_iter_stream_batches() { const ITEMS: usize = 100; const BATCH: usize = 5; let stream = hydroflow::util::iter_batches_stream(0..ITEMS, BATCH); // expect 5 items per tick. let expected: Vec<_> = (0..ITEMS) .map(|n| (TickInstant::new((n / BATCH).try_into().unwrap()), n)) .collect(); let mut df = hydroflow_syntax! { source_stream(stream) -> map(|x| (context.current_tick(), x)) -> assert_eq(expected); }; df.run_available(); }