//! Here you'll see a demonstration of how to create an async processor that needs a single thread to perform simple operations, //! and, for this reason, it is way faster than [parallel_processor].\ //! On the example implemented here, it is able to perform: //! - 868k/s input messages speed -- with 180% CPU -- for the following input: //! (I used a variation of the following command, writing the input to a file and then passing the output through an 8k buffer dd writing to /dev/null) //! clear; (for i in {1..5654356}; do for m in "Ping" "Speechless" "Pang" "Help" "Ping" "Speechless" "Pang" "Help"; do echo "$m";done; done) | nc -vvvv localhost 9758 | dd status=progress | wc -l //! - 4M/s was attained (similar CPU usage) with an input file from this command: //! (for i in {1..5654356}; do echo -en "Speechless\nSpeechless\nSpeechless\nSpeechless\nSpeechless\nSpeechless\nSpeechless\nSpeechless\n"; done) >/tmp/kickass.input2 //! - IMPORTANT: set `sync_processors()` to use a waiting producer, like [super::executor::sync_futures_processors()], or else you'll simply get `TooBusy` answers //! //! Analysis: //! - One thread is executing `message-io` and another, this processor //! - The last test don't use allocations and do not send back any messages -- and it was almost 6 times faster //! - In the future, those figures are to be improved when `message-io` is replaced with a Tokio implementation //! (so there is no async/sync overhead) //! //! `message-io`: it was a negative surprise that `message-io` wasn't able to process any other connections when these flood tests were being executed //! //! ================================================================== //! Easy: clear; (for msg in 'ClientIdentification(MarketDataBridge(version: "123", symbol: "PETR3", account_token: "mYtOkEn"))' 'MarketData(SymbolState(symbol: "PETR3", in_auction: false))'; do sleep 1; echo "> $msg" >&2; echo "$msg"; done; sleep 1; echo -en "\n### Now write what you want or hit CTRL-C\n\n" >&2; cat) | nc -vvvv 192.168.1.37 9758 //! Flood: clear; m1='ClientIdentification(MarketDataBridge(version: "123", symbol: "PETR3", account_token: "mYtOkEn"))'; m2='MarketData(SymbolState(symbol:"P3",in_auction:true))'; (sleep 1; echo "> $m1" >&2; echo "$m1"; sleep 1; echo "> $m2 (as much as possible)" >&2; while true; do echo "$m2"; done) | nc -vvvv 192.168.1.37 9758 //! Minimum flood, without message corruption: clear; m1='ClientIdentification(MarketDataBridge(version: "123", symbol: "PETR3", account_token: "mYtOkEn"))'; m2='MarketData(SymbolState(symbol:"P3",in_auction:true))'; (sleep 1; echo "> $m1" >&2; echo "$m1"; sleep 1; echo "> $m2 (as much as possible)" >&2; while true; do echo "$m2"; sleep 0.009; done) | nc -vvvv 192.168.1.37 9758 //! //! Note: the flood test above, due to the use of message-io, makes the server see wrong messages, occasionally -- because a packet split a message in two, the two parts will be treated as two different messages (both invalid). //! This should be solved when we move out of message-io into our own tokio-only solution. //! //! References: //! - https://github.com/sloganking/Rust-and-Tokio-Chat-Server/blob/51d6bad64be0996202ba610d80296a829a0c950d/src/main.rs //! - https://github.com/tokio-rs/tokio/blob/master/examples/chat.rs use super::{ types::*, protocol_model::*, socket_server::{SocketServer}, connection::{SocketEvent, Peer, PeerId}, }; use crate::{ Runtime, logic::ogre_robot::{ types::*, events::dispatcher::Dispatcher, }, }; use std::{ sync::Arc, collections::HashMap, fmt::Debug, ops::Deref, }; use std::time::Duration; use futures::{Stream, stream, StreamExt}; use minstant::Instant; use tokio::sync::{RwLock, RwLockWriteGuard}; use log::{trace, warn}; /// customize this to hold the states you want for each client #[derive(Debug)] struct ClientState { identification: TimeTrackedInfo, /// used to send async messages to the client peer: Arc>, //round_trips: // Option -- count, average, last estimated_clock_skew_nanos: Option, // metrics session: //messages_count: HashMap // move it to the Socket Server -- optionally enabled in generic const } /// Here is where the main "protocol" processor logic sits: returns a Stream pipeline able to /// transform client inputs ([ClientMessages] requests) into server outputs ([ServerMessages] answers) fn processor(dispatcher: Arc, stream: impl Stream>) -> impl Stream>, ServerMessages), (Arc>, Box)>> { let client_states: Arc>> = Arc::new(RwLock::new(HashMap::new())); stream .map(move |socket_event: SocketEvent| { let dispatcher = Arc::clone(&dispatcher); let client_states = Arc::clone(&client_states); async move { match socket_event { SocketEvent::Incoming { peer, message: client_message } => { // standard read-only lock on `state` let lock = client_states.read().await; let state = lock.get(&peer.peer_id) .map_or_else(|| Err( ( Arc::clone(&peer), Box::from(format!("SocketServer.Processor expects incoming messages only from known clients. Peer id {}, who is not in our `client_states` hashmap, popped up from address {:?}", peer.peer_id, peer.peer_address)) ) ), |state| Ok(state))?; let server_message = match client_message { ClientMessages::ClientIdentification(client_identification) => { // upgrade our lock into a writeable `state` drop(lock); let mut state = RwLockWriteGuard::map(client_states.write().await, |states| states.get_mut(&peer.peer_id).unwrap() ); match state.identification.set(client_identification) { Ok(client_identification) => { if dispatcher.register_from_client_identification(client_identification).await { ServerMessages::None } else { // TODO this failure is currently happening for the new connection -- it must be the other way round, as stated on the Reconnected docs ServerMessages::Disconnected(DisconnectionReason::Reconnected { ip: "yours".to_string() }) } }, Err(client_identification) => { let message = format!("Attempt to authenticate twice! Protocolar authentication was {:?}; Out of protocol attempted one is {:?}", state.identification, client_identification); let disconnection_reason = DisconnectionReason::ProtocolOffense { message }; dispatcher.unregister_from_client_identification(&*state.identification, disconnection_reason.clone()).await; drop(state); client_states.write().await .remove(&peer.peer_id); ServerMessages::Disconnected(disconnection_reason) } } }, ClientMessages::UserAuthorization(_) => ServerMessages::None, ClientMessages::KeepAliveRequest(n) => ServerMessages::KeepAliveAnswer(n+1), ClientMessages::KeepAliveAnswer(_) => ServerMessages::None, ClientMessages::MarketData(market_data) => { dispatcher.market_data(&*state.identification, market_data.into()).await; ServerMessages::None }, ClientMessages::ExecutedOrder { .. } => ServerMessages::None, ClientMessages::CancelledOrder { .. } => ServerMessages::None, ClientMessages::PendingOrder { .. } => ServerMessages::None, ClientMessages::ChartPoints { .. } => ServerMessages::None, ClientMessages::GoodBye(_) => ServerMessages::Disconnected(DisconnectionReason::ClientInitiated), ClientMessages::UnknownMessage(txt) => ServerMessages::Disconnected(DisconnectionReason::ProtocolOffense {message: format!("Unknown message received: '{}'. Bailing out...", txt)}), }; Ok(Some((peer, server_message))) }, SocketEvent::Connected { peer } => { client_states.write().await .insert(peer.peer_id, ClientState { identification: TimeTrackedInfo::Unset, peer: Arc::clone(&peer), estimated_clock_skew_nanos: None }); Ok(Some((peer, ServerMessages::Welcome/*(runtime.)*/))) }, SocketEvent::Disconnected { peer } => { let state = client_states.write().await .remove(&peer.peer_id).expect("disconnected one was not present"); dispatcher.unregister_from_client_identification(&*state.identification, DisconnectionReason::ClientInitiated).await; Ok(Some((peer, ServerMessages::None))) }, SocketEvent::Shutdown { timeout_ms } => { warn!("SocketServer processor: Sending goodbye message and disconnecting from {} clients", client_states.read().await.len()); for (_peer_id, client_info) in client_states.read().await.iter() { client_info.peer.sender.send(ServerMessages::ShuttingDown).await; client_info.peer.sender.close(); } Ok(None) }, } } }) // transforms `Result>` into `Option>` // (the former is used in the above map so we may use there the `?` operator without trouble) .filter_map(|fallible_future| async { match fallible_future.await { Ok(optional_msg) => match optional_msg { Some(msg) => Some(Ok(msg)), None => None, }, Err(err) => Some(Err(err)), } }) } /// Returns a tied-together `(stream, producer, closer)` tuple which [socket_server] uses to transform [ClientMessages] into [ServerMessages].\ /// The tuple consists of: /// - The `Stream` of (`Endpoint`, [ServerMessages]) -- [socket_server] will, then, apply operations at the end of it to deliver the messages /// - The producer to send `SocketEvent` to that stream /// - The closer of the stream\ /// Usage example: /// ```no_compile /// futures::executor::block_on(async { /// state.detached_sender.produce((endpoint, ServerMessages::UnknownMessage("Just wanted to tell you to go sleep".to_string()))).await; /// }); pub async fn sync_processors(runtime: &RwLock, socket_server: &SocketServer<'static>) -> (impl Stream>, ServerMessages), (Arc>, Box) > >, impl Fn(SocketEvent) -> bool, impl Fn()) { let tokio_runtime = Arc::clone(runtime.read().await.tokio_runtime.as_ref().unwrap()); let (stream, producer, closer) = super::executor::sync_tokio_stream(tokio_runtime); let dispatcher = Runtime::do_for_ogre_robot(runtime, |ogre_robot| Box::pin(async {Arc::clone(&ogre_robot.dispatcher)})).await; (processor(dispatcher, stream), producer, closer) } /// see [super::executor::spawn_concurrent_stream_executor()] pub async fn spawn_stream_executor(stream: impl Stream>, bool)> + Send + Sync + 'static) -> tokio::task::JoinHandle<()> { super::executor::spawn_stream_executor(stream).await } #[derive(Debug)] enum TimeTrackedInfo { Unset, Set { time: Instant, info: InfoType }, } impl TimeTrackedInfo { pub fn new() -> Self { Self::Unset } /// Allows setting a value once, keeping track of the moment it was set: /// consumes `info` if the previous value was `Unset` -- in which case it will now be `Set` and the function will return `Ok(&info)`;\ /// otherwise, returns `info` back to the caller as `Err(info)`.\ /// See [reset()] if your logic is intended to set the value multiple times pub fn set(&mut self, info: InfoType) -> Result<&InfoType, InfoType> { match self { TimeTrackedInfo::Unset => Ok(self.reset(info)), TimeTrackedInfo::Set { .. } => Err(info) } } /// Allows setting a value multiple times, keeping track of the moment it was set:\ /// Returns a reference to `info`.\ /// See [set()] if your logic is supposed to set the information only once pub fn reset(&mut self, info: InfoType) -> &InfoType { *self = Self::Set { time: Instant::now(), info }; match *self { TimeTrackedInfo::Unset => panic!("BUG! Attempt to Deref a `TimeTrackedInfo` that is still `Unset`. Please, fix your code."), TimeTrackedInfo::Set { time: _time, ref info } => info, } } } impl Deref for TimeTrackedInfo { type Target = InfoType; fn deref(&self) -> &Self::Target { match self { TimeTrackedInfo::Unset => panic!("BUG! Attempt to Deref a `TimeTrackedInfo` that is still `Unset`. Please, fix your code."), TimeTrackedInfo::Set { time: _time, ref info } => info, } } }