#[path = "../composite_protocol_stacking_common/mod.rs"] mod composite_protocol_stacking_common; pub mod protocol_processor; use composite_protocol_stacking_common::protocol_model::{GameClientMessages, GameServerMessages}; use protocol_processor::ServerProtocolProcessor; use std::{ future, sync::Arc, time::Duration, }; use reactive_messaging::prelude::*; use log::warn; use crate::composite_protocol_stacking_common::protocol_model::{PreGameClientMessages, PreGameServerMessages, ProtocolStates}; const LISTENING_INTERFACE: &str = ""; const LISTENING_PORT: u16 = 1234; const NETWORK_CONFIG: ConstConfig = ConstConfig { ..ConstConfig::default() }; #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED")); warn!("Ping-Pong server starting at {LISTENING_INTERFACE}:{LISTENING_PORT}"); let server_processor_ref1 = Arc::new(ServerProtocolProcessor::new()); let server_processor_ref2 = Arc::clone(&server_processor_ref1); let server_processor_ref3 = Arc::clone(&server_processor_ref1); let server_processor_ref4 = Arc::clone(&server_processor_ref1); let mut socket_server = new_composite_socket_server!(NETWORK_CONFIG, LISTENING_INTERFACE, LISTENING_PORT, ProtocolStates); // pre-game protocol processor let pre_game_processor = spawn_server_processor!(NETWORK_CONFIG, Atomic, socket_server, PreGameClientMessages, PreGameServerMessages, move |connection_event| { server_processor_ref1.pre_game_connection_events_handler(connection_event); future::ready(()) }, move |client_addr, port, peer, client_messages_stream| server_processor_ref2.pre_game_dialog_processor(client_addr, port, peer, client_messages_stream) )?; // game protocol processor let game_processor = spawn_server_processor!(NETWORK_CONFIG, Atomic, socket_server, GameClientMessages, GameServerMessages, move |connection_event| { server_processor_ref3.game_connection_events_handler(connection_event); future::ready(()) }, move |client_addr, port, peer, client_messages_stream| server_processor_ref4.game_dialog_processor(client_addr, port, peer, client_messages_stream) )?; socket_server.start_multi_protocol(ProtocolStates::PreGame, move |socket_connection: &SocketConnection, _| match socket_connection.state() { ProtocolStates::PreGame => Some(pre_game_processor.clone_sender()), ProtocolStates::Game => Some(game_processor.clone_sender()), ProtocolStates::Disconnect => None, }, |_| future::ready(()) ).await?; let wait_for_termination = socket_server.termination_waiter(); tokio::spawn( async move { tokio::time::sleep(Duration::from_secs(300)).await; socket_server.terminate().await.expect("Failed to Terminate the server"); }); wait_for_termination().await?; Ok(()) //let (processor_stream, stream_producer, stream_closer) = socket_server.set_processor() // socket server should allow me to inquire some executor/stream metrics (from reactive-mutiny) and should allow for clean shutdowns // conversely, there should be a SocketClient class, also receiving a host, port, workers? a processor and a sender?? Yes, a sending facility, through the `SocketClient` instance // the socket server uses a Uni to create a producer/stream pair -- the socket handling code will read from the socket and "produce" to the Uni; the executor will consume from the Stream // (all connected clients on a single Uni? Yes! Add more listeners for distribution, if that makes sense) // this library might consider: stateless server & stateful server -- on the stateful server processor, one stream processor will be created for each client (and will allow custom "authentication" routines to create a "session") // This "session" object is, then, passed to the processor creating pipeline builder. So, this internal manager is expected to hold the Stream instances in a container that is faster than anything else (is there an atomic Hashmap available?) // -------------------- // today a task is already spawned for each client, at which point the state may be created -- so we don't need that stateful / stateless distinction: we just need a refactoring: // we must pass in a function that will generate a "producer" when requested -- and that producer may hold the state // the producer is any struct implementing a trait (from reactive-mutiny?) // but the state need to be on the stream processing function -- and not on the stream producer... // so we must devise a "builder" function that will return a producer & stream processor fn, which (the fn) holds the state. Can this work? Integration test? Example? // fn pipeline_builder(client_addr: String, connected_port: u16) -> impl Stream // fn pipeline_builder(client_addr: String, connected_port: u16) -> MutinyStream // the `MutinyStream`, itself, will be created from a pipeline builder itself, which receives impl Stream -> impl Stream // therefore: // fn pipeline_builder(client_addr: String, _connected_port: u16, incoming_messages_stream: impl Stream) -> impl Stream // and the SocketServer::new() receives it }