use std::net::SocketAddr; use bytes::Bytes; use futures::stream::SplitSink; use hydroflow::hydroflow_syntax; use hydroflow::lattices::Merge; use hydroflow::scheduled::graph::Hydroflow; use tokio_util::codec::LengthDelimitedCodec; use tokio_util::udp::UdpFramed; use crate::lattices::SealedSetOfIndexedValues; use crate::structs::Request; use crate::test_data::client_class_iter; pub(crate) async fn client_state_flow( shopping_ssiv: impl Iterator)> + 'static, out_addr: SocketAddr, out: SplitSink, (Bytes, SocketAddr)>, local_addr: SocketAddr, remote_addr: SocketAddr, ) -> Hydroflow<'static> { let client_class = client_class_iter(); // First define some shorthand for the merge and bot of this lattice let ssiv_merge = as Merge>>::merge; const SSIV_BOT: fn() -> SealedSetOfIndexedValues = Default::default; // Set up the Udp socket for proxy-server communication let (carts_out, carts_in, _) = hydroflow::util::bind_udp_bytes(local_addr).await; // This is like server_state_flow, but we split it into two transducers at a different spot. // Here, the first transducer takes in shopping_ssiv requests and runs a stateful fold_keyed, persisting all the shopping requests in ssiv's. // The second transducer listens on reqs_in and runs the lookup join. hydroflow_syntax! { source_iter(shopping_ssiv) -> fold_keyed::<'static>(SSIV_BOT, ssiv_merge) -> map(|pair| (pair, remote_addr)) -> dest_sink_serde(carts_out); source_stream_serde(carts_in) -> map(Result::unwrap) -> map(|((client, cart), _a): ((usize, SealedSetOfIndexedValues), _)| (client, cart)) -> [0]lookup_class; source_iter(client_class) -> [1]lookup_class; lookup_class = join::<'static>() -> map(|(client, (li, class))| ((client, class), li)) -> map(|m| (m, out_addr)) -> dest_sink_serde(out); } }