use std::net::SocketAddr; use bytes::Bytes; use futures::stream::SplitSink; use hydroflow::hydroflow_syntax; use hydroflow::lattices::Merge; use hydroflow::scheduled::graph::Hydroflow; use tokio_util::codec::LengthDelimitedCodec; use tokio_util::udp::UdpFramed; use crate::lattices::SealedSetOfIndexedValues; use crate::structs::Request; use crate::test_data::client_class_iter; pub(crate) async fn server_state_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, ) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice let ssiv_merge = as Merge>>::merge; const SSIV_BOT: fn() -> SealedSetOfIndexedValues = Default::default; // Set up the Udp socket for proxy-server communication let (reqs_out, reqs_in, _) = hydroflow::util::bind_udp_bytes(local_addr).await; // This is like push_group_flow, but we split it into two transducers that communicate via reqs_out and reqs_in. // The first transducer takes in shopping_ssiv requests, and forwards them via reqs_out to the second transducer. // The second transducer listens on reqs_in and runs the stateful logic of fold_keyed and join. hydroflow_syntax! { // Networked: Server-Side State source_iter(shopping_ssiv) -> map(|pair| (pair, remote_addr)) -> dest_sink_serde(reqs_out); source_stream_serde(reqs_in) -> map(Result::unwrap) -> map(|((client, req), _a): ((usize, SealedSetOfIndexedValues), _)| (client, req)) -> fold_keyed::<'static>(SSIV_BOT, ssiv_merge) -> [0]lookup_class; source_iter(client_class) -> [1]lookup_class; lookup_class = join::<'static>() -> map(|(client, (li, class))| ((client, class), li)) -> map(|m| (m, out_addr)) -> dest_sink_serde(out); } }