use hydroflow::hydroflow_syntax; pub fn main() { // An edge in the input data = a pair of `usize` vertex IDs. let (edges_send, edges_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow_syntax! { // inputs: the origin vertex (vertex 0) and stream of input edges origin = source_iter(vec![0]); stream_of_edges = source_stream(edges_recv); // the join reached_vertices -> map(|v| (v, ())) -> [0]my_join_tee; stream_of_edges -> [1]my_join_tee; my_join_tee = join() -> flat_map(|(src, ((), dst))| [src, dst]) -> tee(); // the cycle: my_join_tee gets data from reached_vertices // and provides data back to reached_vertices! origin -> [base]reached_vertices; my_join_tee -> [cycle]reached_vertices; reached_vertices = union(); // the output my_join_tee[print] -> unique() -> for_each(|x| println!("Reached: {}", x)); }; println!( "{}", flow.meta_graph() .expect("No graph found, maybe failed to parse.") .to_mermaid(&Default::default()) ); edges_send.send((0, 1)).unwrap(); edges_send.send((2, 4)).unwrap(); edges_send.send((3, 4)).unwrap(); edges_send.send((1, 2)).unwrap(); edges_send.send((0, 3)).unwrap(); edges_send.send((0, 3)).unwrap(); edges_send.send((4, 0)).unwrap(); flow.run_available(); }