# TcpCommunicator This crate provides an implementation of an RPC (Remote Procedure Call) framework, designed for learning purposes. ## TcpCommunicator `TcpCommunicator` implements the `Communicator` trait and is designed to work with TCP streams. It is generic over types `T` and `S`, where `T` must be `Send`, `Serialize` and `DeserializeOwned`, and `S` must be a `Stream` of `TcpMessage` items. The following are the key methods provided by `TcpCommunicator`: ```rust /// Creates a new TcpCommunicator instance with the given TcpStream. /// This can be useful when you already have a TcpStream instance and want to create a TcpCommunicator instance from it. pub fn new_with_stream(stream: TcpStream) -> TcpCommunicator; /// Connects to a TcpCommunicatorHub at the given address, establishing a TCP connection. /// It returns a new instance of TcpCommunicator or a CommunicatorError if the connection fails. pub async fn connect(address: String) -> Result; /// Serializes the given value into a TcpMessage and sends it over the TCP connection. /// Similar to `send_message`, it returns a Uuid that uniquely identifies the sent message. async fn send_data(&mut self, value: T) -> Result; /// Sends the given value as a message and waits for a response message. /// This method is useful for request-reply scenarios where you want to send a message and wait for a response. async fn send_and_wait(&mut self, value: T) -> Result; /// Sends a response message to a previously received message with the given Uuid. /// This method is useful when implementing servers that need to respond to incoming messages. async fn answer_to(&mut self, id: &Uuid, answer: T) -> Result<(), CommunicatorError>; /// Attempts to retrieve an incoming message. /// If there is no message, this method returns `None`. /// It's important to note that this method does not block, so it can return `None` even if there are messages that are about to arrive. async fn try_take(&mut self) -> Option; /// Returns a stream of incoming messages. /// This is a convenient way of continuously receiving new messages. fn incoming(&self) -> TcpCommunicatorStream; ``` ## TcpCommunicatorHub TcpCommunicatorHub is a hub that manages multiple TcpCommunicator client instances. It is generic over T message type, which must be Send, Serialize and DeserializeOwned. The following are the key methods provided by TcpCommunicatorHub: ```rust /// Starts a TcpCommunicatorHub instance which listens for connections on the provided address. /// This is usually the first method to be called when setting up a new TcpCommunicatorHub. pub async fn start(address: String) -> Result; /// Tries to take a message from any of the connected Communicators. /// This function will return a tuple consisting of the CommunicatorId and the TcpMessage if there is a message available. /// If there are no messages available, it will return `None`. /// This method is non-blocking and may return `None` even if there are messages about to arrive. pub async fn try_take(&mut self) -> Option<(CommunicatorId, TcpMessage)>; /// Sends data to a specific Communicator identified by the provided CommunicatorId. /// The data is serialized and sent as a TcpMessage. /// This method returns a Uuid which is the unique identifier for the sent message. pub async fn send_data(&mut self, communicator_id: CommunicatorId, data: T) -> Result; /// Sends data to a specific Communicator and waits for a response message. /// This method is useful for request-reply scenarios where you want to send a message and wait for a response. pub async fn send_and_wait(&mut self, communicator_id: CommunicatorId, data: T) -> Result, CommunicatorError>; /// Returns a stream of incoming messages from all connected Communicators. /// This is a convenient way of continuously receiving new messages. pub fn incoming(&mut self) -> TcpCommunicatorHubStream; ``` # Example ```rust use std::pin::pin; use futures::{pin_mut, StreamExt}; use serde::{Deserialize, Serialize}; use tokio::join; use crate::{Communicator, CommunicatorMessage, TcpCommunicator}; use crate::error::CommunicatorError; use crate::tcp::TcpCommunicatorHub; use crate::tests::tcp_message; #[tokio::main] async fn main() -> Result<(), CommunicatorError> { let url = "127.0.0.1:550066".to_string(); // setup communicator hub. It will listen for incoming connections // and will be able to send messages to all connected clients let hub_url = url.clone(); let hub_task = tokio::spawn(async move { let mut hub = TcpCommunicatorHub::::start(hub_url).await.unwrap(); let mut stream = hub.incoming(); while let Some((communicator_id, message)) = stream.next().await { let value = message.value(); match value { MyCommunicatorMessage::SetUserName(name) => { println!("Set username to: {}", name); }, MyCommunicatorMessage::GetUserAge() => { let result = 42; println!("return user age to client: {}", result); hub.answer_to(communicator_id, message.id(), MyCommunicatorMessage::GetUserAgeResponse(result)).await.unwrap(); }, MyCommunicatorMessage::Exit => { println!("{}: {}", message.id(), "Exit"); for client in hub.get_client_list().await { // inform all clients, that server is going to shutdown hub.answer_to(client.id(), message.id(), MyCommunicatorMessage::GoingToShutdown).await.unwrap(); } return; }, _ => { println!("{}: {}", message.id(), "Unknown message"); } } } }); // setup client. It will connect to server, send messages to it and wait for responses let client_url = url.clone(); let client_task = tokio::spawn(async move { let mut client = TcpCommunicator::::connect(client_url).await.unwrap(); // fire message and forgot example client.send_data(MyCommunicatorMessage::SetUserName("John".to_string())).await.unwrap(); // send message and wait response to this message let response = client.send_and_wait(MyCommunicatorMessage::GetUserAge()).await.unwrap(); let response_value = response.value(); match response_value { MyCommunicatorMessage::GetUserAgeResponse(age) => { println!("User age is: {}", age); }, MyCommunicatorMessage::GoingToShutdown => { println!("{}: {}", response.id(), "Server is going to shutdown"); }, _ => { println!("{}: {}", response.id(), "Unknown message"); } } client.send_data(MyCommunicatorMessage::Exit).await.unwrap(); }); join!(hub_task, client_task); return Ok(()); } #[derive(Serialize, Deserialize)] enum MyCommunicatorMessage { SetUserName(String), GetUserAge(), GetUserAgeResponse(u32), Exit, GoingToShutdown } ```