use std::net::SocketAddr; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::util::{UdpSink, UdpStream}; use crate::helpers::parse_out; use crate::protocol::{CoordMsg, MsgType, SubordResponse}; use crate::{Addresses, Opts}; pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts: Opts) { println!("Coordinator live!"); let path = opts.path(); let mut df: Hydroflow = hydroflow_syntax! { // fetch subordinates from file, convert ip:port to a SocketAddr, and tee subords = source_json(path) -> flat_map(|json: Addresses| json.subordinates) -> map(|s| s.parse::().unwrap()) -> tee(); // phase_map tells us what phase each transaction is in // There are only 3 phases per xid: // 1. coordinator sends PREPARE, subordinates vote COMMIT/ABORT // 2. coordinator send final decision, subordinates ACK // 3. coordinate sends END, subordinates respond with ENDED // After phase 3 we delete the xid from the phase_map phase_map = union() -> persist_mut_keyed::<'static>(); // set up channels outbound_chan = tee(); outbound_chan[0] -> dest_sink_serde(outbound); inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(m, _a)| m) -> tee(); msgs = inbound_chan[0] -> demux(|m:SubordResponse, var_args!(commits, aborts, acks, endeds, errs)| match m.mtype { MsgType::Commit => commits.give(m), MsgType::Abort => aborts.give(m), MsgType::AckP2 {..} => acks.give(m), MsgType::Ended {..} => endeds.give(m), _ => errs.give(m), }); msgs[errs] -> for_each(|m| println!("Received unexpected message type: {:?}", m)); msgs[endeds] -> map(|m: SubordResponse| hydroflow::util::PersistenceKeyed::Delete(m.xid)) -> defer_tick() -> phase_map; // we log all messages (in this prototype we just print) inbound_chan[1] -> for_each(|m| println!("Received {:?}", m)); outbound_chan[1] -> for_each(|(m, a)| println!("Sending {:?} to {:?}", m, a)); // setup broadcast channel to all subords broadcast_join = cross_join::<'tick, 'static>() -> outbound_chan; broadcast = union() -> [0]broadcast_join; subords[1] -> [1]broadcast_join; subords[2] -> for_each(|s| println!("Subordinate: {:?}", s)); // Phase 1 initiate: // Given a transaction commit request from stdio, broadcast a Prepare to subordinates initiate = source_stdin() -> filter_map(|l: Result| parse_out(l.unwrap())) -> tee(); initiate -> flat_map(|xid: u16| [hydroflow::util::PersistenceKeyed::Delete(xid), hydroflow::util::PersistenceKeyed::Persist(xid, 1)]) -> phase_map; initiate -> map(|xid:u16| CoordMsg{xid, mtype: MsgType::Prepare}) -> [0]broadcast; // Phase 1 responses: // as soon as we get an abort message for P1, we start Phase 2 with Abort. // We'll respond to each abort message: this is redundant but correct (and monotone) abort_p1s = msgs[aborts] -> tee(); abort_p1s -> flat_map(|m: SubordResponse| [hydroflow::util::PersistenceKeyed::Delete(m.xid), hydroflow::util::PersistenceKeyed::Persist(m.xid, 2)]) -> defer_tick() -> phase_map; abort_p1s -> map(|m: SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::Abort}) -> [1]broadcast; // count commit votes // XXX This fold_keyed accumulates xids without bound. // Should be replaced with a persist_mut_keyed and logic to manage it. commit_votes = msgs[commits] -> map(|m: SubordResponse| (m.xid, 1)) -> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val); // count subordinates subord_total = subords[0] -> fold::<'static>(|| 0, |a: &mut _, _b| *a += 1); // If commit_votes for this xid is the same as all_votes, send a P2 Commit message commit_votes -> map(|(xid, c)| (c, xid)) -> [0]committed; subord_total -> map(|c| (c, ())) -> [1]committed; committed = join::<'tick,'tick>() -> map(|(_c, (xid, ()))| xid); // the committed join would succeed forever once a transaction is chosen for commit // so we filter to only send the P2 commit message and output to screen if this xid is still in Phase 1 // We also transition this xid to Phase 2 to start the next tick committed -> map(|xid| (xid, ())) -> [0]check_committed; phase_map -> [1]check_committed; check_committed = join::<'tick, 'tick>() -> map(|(xid, (_, phase))| (xid, phase)) -> filter(|(_xid, phase)| *phase == 1) -> map(|(xid, _phase)| xid) -> tee(); // update the phase_map check_committed -> flat_map(|xid| [hydroflow::util::PersistenceKeyed::Delete(xid), hydroflow::util::PersistenceKeyed::Persist(xid, 2)]) -> defer_tick() -> phase_map; // broadcast the P2 commit message check_committed -> map(|xid| CoordMsg{xid, mtype: MsgType::Commit}) -> [2]broadcast; // Handle p2 acknowledgments by sending an End message ack_p2s = msgs[acks] -> tee(); ack_p2s -> flat_map(|m: SubordResponse| [hydroflow::util::PersistenceKeyed::Delete(m.xid), hydroflow::util::PersistenceKeyed::Persist(m.xid, 3)]) -> defer_tick() -> phase_map; ack_p2s -> map(|m:SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::End,}) -> [3]broadcast; // Handler for ended acknowledgments not necessary; we just print them }; #[cfg(feature = "debugging")] if let Some(graph) = opts.graph { let serde_graph = df .meta_graph() .expect("No graph found, maybe failed to parse."); serde_graph.open_graph(graph, opts.write_config).unwrap(); } df.run_async().await.unwrap(); }