use std::collections::{HashMap, HashSet}; use adjacency::rga_adjacency; use clap::{Parser, ValueEnum}; use datalog::rga_datalog; use datalog_agg::rga_datalog_agg; use hydroflow::util::collect_ready_async; use hydroflow_lang::graph::{WriteConfig, WriteGraphType}; use minimal::rga_minimal; use protocol::{Timestamp, Token}; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; mod adjacency; mod datalog; mod datalog_agg; mod minimal; mod protocol; #[derive(Parser, Debug, Clone, ValueEnum)] enum Implementation { Datalog, Adjacency, DatalogAgg, Minimal, } #[derive(Parser, Debug)] struct Opts { #[clap(value_enum, long = "impl", short)] implementation: Option, #[clap(long)] graph: Option, #[clap(flatten)] write_config: Option, } #[hydroflow::main] pub async fn main() { { // Set up tracing logger. let subscriber = tracing_subscriber::FmtSubscriber::builder() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_test_writer() .finish(); let _ = tracing::subscriber::set_global_default(subscriber); } // An edge in the input data = a pair of `Token` vertex IDs. let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(Token, Timestamp)>(); let (rga_send, mut rga_recv) = hydroflow::util::unbounded_channel::<(Token, Timestamp)>(); let (list_send, mut list_recv) = hydroflow::util::unbounded_channel::<(Timestamp, Timestamp)>(); let opts = Opts::parse(); let mut hf = match opts.implementation { Some(Implementation::Datalog) => rga_datalog(input_recv, rga_send, list_send), Some(Implementation::Adjacency) => rga_adjacency(input_recv, rga_send, list_send), Some(Implementation::Minimal) => rga_minimal(input_recv, rga_send, list_send), Some(Implementation::DatalogAgg) => rga_datalog_agg(input_recv, rga_send, list_send), None => rga_adjacency(input_recv, rga_send, list_send), }; #[cfg(feature = "debugging")] if let Some(graph) = opts.graph { let serde_graph = hf .meta_graph() .expect("No graph found, maybe failed to parse."); serde_graph.open_graph(graph, opts.write_config).unwrap(); } keystroke((1, 0, 'a'), (0, 0), &input_send).await; keystroke((2, 0, 'b'), (1, 0), &input_send).await; keystroke((3, 0, 'a'), (2, 0), &input_send).await; keystroke((4, 0, 't'), (3, 0), &input_send).await; keystroke((5, 0, 'e'), (4, 0), &input_send).await; keystroke((6, 1, 'o'), (2, 0), &input_send).await; keystroke((7, 1, 'r'), (6, 1), &input_send).await; keystroke((8, 0, 'C'), (0, 0), &input_send).await; keystroke((9, 0, 'o'), (8, 0), &input_send).await; keystroke((10, 0, 'l'), (9, 0), &input_send).await; keystroke((11, 0, 'l'), (10, 0), &input_send).await; hf.run_tick(); let mut output = String::new(); write_to_dot(&mut rga_recv, &mut list_recv, &mut output).await; println!("{}", output); } async fn keystroke( (node_ts, node_id, c): (usize, usize, char), (parent_ts, parent_id): (usize, usize), input_send: &UnboundedSender<(Token, Timestamp)>, ) { input_send .send(( Token { ts: Timestamp { node_ts, node_id }, value: c, }, Timestamp { node_ts: parent_ts, node_id: parent_id, }, )) .unwrap(); } async fn write_to_dot( rga_recv: &mut UnboundedReceiverStream<(Token, Timestamp)>, list_recv: &mut UnboundedReceiverStream<(Timestamp, Timestamp)>, w: &mut impl std::fmt::Write, ) { let tree_edges: HashSet<_> = collect_ready_async(rga_recv).await; let list_edges: HashMap<_, _> = collect_ready_async(list_recv).await; let node_names = tree_edges .iter() .map(|(c, _)| (c.ts, c.value)) .collect::>(); // print RGA tree in dot format writeln!(w, "digraph G {{\nrankdir = TB").unwrap(); writeln!( w, "ts0_0 [label=ts0_0>]" ) .unwrap(); for (c, p) in tree_edges { writeln!(w, "{} -> {} [color=gray, constraint=true]", p, c.ts).unwrap(); writeln!(w, "{}", format_dot_node(c)).unwrap(); } for (first, second) in list_edges.iter() { writeln!( w, "{} -> {} [style=dashed, color=blue, constraint=false]", first, second ) .unwrap(); } let mut x = Timestamp { node_ts: 0, node_id: 0, }; let mut result = String::new(); while let Some(y) = list_edges.get(&x) { result.push(*(node_names.get(y).unwrap())); x = *y; } if result.is_empty() { result = "Unknown".to_string() }; writeln!(w, "label=<{}>", result).unwrap(); writeln!(w, "}}").unwrap(); } fn format_dot_node(n: Token) -> String { format!( "{} [label=<{}{}>]", n.ts, n.value, n.ts ) } #[test] fn test() { use hydroflow::util::{run_cargo_example, wait_for_process_output}; fn escape_regex(input: &str) -> String { input .replace("[", "\\[") .replace("]", "\\]") .replace("{", "\\{") .replace("}", "\\}") } { let (_child, _, mut stdout) = run_cargo_example("rga", "--impl adjacency"); let mut output = String::new(); for line in EXPECTED_OUTPUT.split("\n") { wait_for_process_output(&mut output, &mut stdout, &escape_regex(line)); } } { let (_child, _, mut stdout) = run_cargo_example("rga", "--impl datalog"); let mut output = String::new(); for line in EXPECTED_OUTPUT.split("\n") { wait_for_process_output(&mut output, &mut stdout, &escape_regex(line)); } } { let (_child, _, mut stdout) = run_cargo_example("rga", "--impl minimal"); let mut output = String::new(); for line in EXPECTED_OUTPUT_MINIMAL.split("\n") { wait_for_process_output(&mut output, &mut stdout, &escape_regex(line)); } } // TODO: This implementation appears to be broken. // { // let (_, _, mut stdout) = spawn("rga", "--impl datalog-agg"); // let mut output = String::new(); // for line in EXPECTED_OUTPUT_MINIMAL.split("\n") { // wait_for_output(&mut output, &mut stdout, &escape_regex(line)); // } // } } // Output can be re-ordered, so this will be tested line by line #[cfg(test)] const EXPECTED_OUTPUT: &str = r#"digraph G { rankdir = TB ts0_0 [label=ts0_0>] ts4_0 -> ts5_0 [color=gray, constraint=true] ts5_0 [label=ts5_0>] ts9_0 -> ts10_0 [color=gray, constraint=true] ts10_0 [label=ts10_0>] ts6_1 -> ts7_1 [color=gray, constraint=true] ts7_1 [label=ts7_1>] ts0_0 -> ts1_0 [color=gray, constraint=true] ts1_0 [label=ts1_0>] ts0_0 -> ts8_0 [color=gray, constraint=true] ts8_0 [label=ts8_0>] ts1_0 -> ts2_0 [color=gray, constraint=true] ts2_0 [label=ts2_0>] ts8_0 -> ts9_0 [color=gray, constraint=true] ts9_0 [label=ts9_0>] ts2_0 -> ts3_0 [color=gray, constraint=true] ts3_0 [label=ts3_0>] ts10_0 -> ts11_0 [color=gray, constraint=true] ts11_0 [label=ts11_0>] ts2_0 -> ts6_1 [color=gray, constraint=true] ts6_1 [label=ts6_1>] ts3_0 -> ts4_0 [color=gray, constraint=true] ts4_0 [label=ts4_0>] ts0_0 -> ts8_0 [style=dashed, color=blue, constraint=false] ts9_0 -> ts10_0 [style=dashed, color=blue, constraint=false] ts6_1 -> ts7_1 [style=dashed, color=blue, constraint=false] ts11_0 -> ts1_0 [style=dashed, color=blue, constraint=false] ts7_1 -> ts3_0 [style=dashed, color=blue, constraint=false] ts2_0 -> ts6_1 [style=dashed, color=blue, constraint=false] ts4_0 -> ts5_0 [style=dashed, color=blue, constraint=false] ts10_0 -> ts11_0 [style=dashed, color=blue, constraint=false] ts3_0 -> ts4_0 [style=dashed, color=blue, constraint=false] ts8_0 -> ts9_0 [style=dashed, color=blue, constraint=false] ts1_0 -> ts2_0 [style=dashed, color=blue, constraint=false] label=<Collaborate> }"#; // Output can be re-ordered, so this will be tested line by line #[cfg(test)] const EXPECTED_OUTPUT_MINIMAL: &str = r#"digraph G { rankdir = TB ts0_0 [label=ts0_0>] ts3_0 -> ts4_0 [color=gray, constraint=true] ts4_0 [label=ts4_0>] ts0_0 -> ts8_0 [color=gray, constraint=true] ts8_0 [label=ts8_0>] ts2_0 -> ts3_0 [color=gray, constraint=true] ts3_0 [label=ts3_0>] ts6_1 -> ts7_1 [color=gray, constraint=true] ts7_1 [label=ts7_1>] ts0_0 -> ts1_0 [color=gray, constraint=true] ts1_0 [label=ts1_0>] ts10_0 -> ts11_0 [color=gray, constraint=true] ts11_0 [label=ts11_0>] ts8_0 -> ts9_0 [color=gray, constraint=true] ts9_0 [label=ts9_0>] ts2_0 -> ts6_1 [color=gray, constraint=true] ts6_1 [label=ts6_1>] ts9_0 -> ts10_0 [color=gray, constraint=true] ts10_0 [label=ts10_0>] ts1_0 -> ts2_0 [color=gray, constraint=true] ts2_0 [label=ts2_0>] ts4_0 -> ts5_0 [color=gray, constraint=true] ts5_0 [label=ts5_0>] label=<Unknown> }"#;