use hydroflow::util::collect_ready; use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax}; use multiplatform_test::multiplatform_test; #[multiplatform_test] pub fn test_multiset_delta() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); let mut flow = hydroflow_syntax! { source_stream(input_recv) -> multiset_delta() -> for_each(|x| result_send.send(x).unwrap()); }; assert_graphvis_snapshots!(flow); input_send.send(3).unwrap(); input_send.send(4).unwrap(); input_send.send(3).unwrap(); flow.run_tick(); assert_eq!(&[3, 4, 3], &*collect_ready::, _>(&mut result_recv)); input_send.send(3).unwrap(); input_send.send(5).unwrap(); input_send.send(3).unwrap(); input_send.send(3).unwrap(); flow.run_tick(); // First two "3"s are removed due to previous tick. assert_eq!(&[5, 3], &*collect_ready::, _>(&mut result_recv)); } #[multiplatform_test] pub fn test_persist_multiset_delta() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); let (output_send, mut output_recv) = hydroflow::util::unbounded_channel::(); let mut flow = hydroflow::hydroflow_syntax! { source_stream(input_recv) -> persist::<'static>() -> multiset_delta() -> for_each(|x| output_send.send(x).unwrap()); }; input_send.send(1).unwrap(); flow.run_tick(); assert_eq!(&[(1)], &*collect_ready::, _>(&mut output_recv)); flow.run_tick(); assert!(collect_ready::, _>(&mut output_recv).is_empty()); flow.run_tick(); assert!(collect_ready::, _>(&mut output_recv).is_empty()); } #[multiplatform_test] pub fn test_multiset_delta_2() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::(); let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::(); let mut flow = hydroflow_syntax! { source_stream(input_recv) -> multiset_delta() -> for_each(|x| result_send.send(x).unwrap()); }; input_send.send(3).unwrap(); input_send.send(4).unwrap(); input_send.send(3).unwrap(); flow.run_tick(); assert_eq!(&[3, 4, 3], &*collect_ready::, _>(&mut result_recv)); input_send.send(3).unwrap(); input_send.send(4).unwrap(); input_send.send(3).unwrap(); input_send.send(3).unwrap(); flow.run_tick(); assert_eq!(&[3], &*collect_ready::, _>(&mut result_recv)); } #[multiplatform_test] fn test_chat_app_replay() { let (users_send, users) = hydroflow::util::unbounded_channel::(); let (messages_send, messages) = hydroflow::util::unbounded_channel::(); let (out, mut out_recv) = hydroflow::util::unbounded_channel::<(u32, String)>(); let mut chat_server = hydroflow_syntax! { users = source_stream(users) -> persist::<'static>(); messages = source_stream(messages) -> persist::<'static>(); users -> [0]crossed; messages -> [1]crossed; crossed = cross_join::<'tick, 'tick>() -> multiset_delta() -> for_each(|t| { out.send(t).unwrap(); }); }; users_send.send(1).unwrap(); users_send.send(2).unwrap(); messages_send.send("hello".to_string()).unwrap(); messages_send.send("world".to_string()).unwrap(); chat_server.run_tick(); assert_eq!( &[ (1, "hello".to_string()), (2, "hello".to_string()), (1, "world".to_string()), (2, "world".to_string()) ], &*collect_ready::, _>(&mut out_recv), ); users_send.send(3).unwrap(); messages_send.send("goodbye".to_string()).unwrap(); chat_server.run_tick(); // fails with: [(1, "hello"), (2, "hello"), (3, "hello"), (1, "world"), (2, "world"), (3, "world"), (1, "goodbye"), (2, "goodbye"), (3, "goodbye")] assert_eq!( &[ (3, "hello".to_string()), (3, "world".to_string()), (1, "goodbye".to_string()), (2, "goodbye".to_string()), (3, "goodbye".to_string()) ], &*collect_ready::, _>(&mut out_recv), ); } #[multiplatform_test] fn test_chat_app_replay_manual() { let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(u32, String)>(); let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<(u32, String)>(); let mut flow = hydroflow_syntax! { source_stream(input_recv) -> multiset_delta() -> for_each(|x| result_send.send(x).unwrap()); }; input_send.send((1, "hello".to_owned())).unwrap(); input_send.send((2, "hello".to_owned())).unwrap(); input_send.send((1, "world".to_owned())).unwrap(); input_send.send((2, "world".to_owned())).unwrap(); flow.run_tick(); assert_eq!( &[ (1, "hello".to_string()), (2, "hello".to_string()), (1, "world".to_string()), (2, "world".to_string()) ], &*collect_ready::, _>(&mut result_recv), ); input_send.send((1, "hello".to_owned())).unwrap(); input_send.send((2, "hello".to_owned())).unwrap(); input_send.send((3, "hello".to_owned())).unwrap(); input_send.send((1, "world".to_owned())).unwrap(); input_send.send((2, "world".to_owned())).unwrap(); input_send.send((3, "world".to_owned())).unwrap(); input_send.send((1, "goodbye".to_owned())).unwrap(); input_send.send((2, "goodbye".to_owned())).unwrap(); input_send.send((3, "goodbye".to_owned())).unwrap(); flow.run_tick(); assert_eq!( &[ (3, "hello".to_string()), (3, "world".to_string()), (1, "goodbye".to_string()), (2, "goodbye".to_string()), (3, "goodbye".to_string()) ], &*collect_ready::, _>(&mut result_recv), ); }