# TCP message I/O A simple TCP client/server implementation for [`tokio`](https://docs.rs/tokio/). The library contains two abstraction levels: - High-level interface: allows exchanging Rust types using [`serde`](https://serde.rs/). - Low-level interface: allows exchanging `Vec` messages. This page describes the high-level abstraction, for the low-level one check the [`raw`] submodule. ## Goals & non-goals - Hide complexity of listening and accepting TCP connections, and turning a TCP stream into a request/response stream. - Serialize and deserialize messages into Rust types. - Customizable serialization & compression. - Transparent compression using [`zstd`](https://docs.rs/zstd/). - Flexibility: use convenience features or the raw interface. This library intentionally leaves most error handling to the user. ## Cargo `features` By default, no feature is enabled. Available features: - `postcard` - Enable automatic serialization using [`postcard`](https://docs.rs/postcard/) (a fast and efficient serialization format). - `zstd` - Enable transparent compression of messages using [`zstd`](https://docs.rs/zstd/). We recommend enabling both for maximum simplicity. Note that both client and server must use the same features, otherwise they won't be able to understand each other messages. ## Client Client usage is straightforward: ```rust use tcp_message_io::TCPClient; use tokio; use serde::{Deserialize, Serialize}; // This type represents the requests to the server. #[derive(Serialize, Deserialize)] enum Request { Hello, } // This type represents the responses from the server. #[derive(Debug, Serialize, Deserialize, PartialEq)] enum Response { World, } #[tokio::main] async fn main() { // We need to specify the response type so that tcp_message_io // knows what object to use for response deserialization. let client = TCPClient::<_, Response>::connect("127.0.0.1", 12345).await.unwrap(); let response = client.send(Request::Hello).await.unwrap(); assert_eq!(response, Some(Response::World)); } ``` ## Server Creating a server is very straightforward as well: ```rust use anyhow::Result; use tcp_message_io::{TCPServer, TCPResponse}; use tokio; use serde::{Deserialize, Serialize}; // This type represents the requests to the server. #[derive(Serialize, Deserialize)] enum Request { Hello, } // This type represents the responses from the server. #[derive(Serialize, Deserialize)] enum Response { World, } // We need a request handler: in this case we implement a simple // "Hello, World" handler. async fn hello_world_handler(request: Request) -> Result> { match request { Request::Hello => Ok(TCPResponse::Message(Response::World)) // Handle additional request types here! } } #[tokio::main] async fn main() { TCPServer::new("127.0.0.1", 12345, hello_world_handler).listen().await; } ``` [`TCPResponse`] can be one of the following: - `TCPResponse::Message(response)` to send a response message. - `TCPResponse::CloseConnection` to close the connection with the client. This will also send an empty response to the client. - `TCPResponse::StopServer` to shut the server down. This will also send an empty response to the client and close the connection. The library relies on [`anyhow`](https://docs.rs/anyhow/) for error handling, enabling the handler to return errors of any type. If the handler returns an error, the client recives an empty message, a [`tracing`](https://docs.rs/tracing/) error message will be logged and the server keeps listening for new messages from the same client. This mechanism is meant for unhandled errors, and avoids leaving the client hanging for a response. It's left as a responsibility of the user to build an error reporting mechanism on top of the transform if required. For example, this can be achieved by ensuring the handler always returns `Ok(...)`, and errors are send back as an enum variant. ## Stopping the TCP server after a timeout Additionally, this crate supports stopping the server after a certain amount of inactivity (inactivity timeout): ```rust TCPServer::new("127.0.0.1", 12345, echo_handler) .with_inactivity_timeout(60) // Seconds .listen() .await; ``` This feature is useful when building something like a worker node: a node might be orphaned for many reasons (network issues, master crashing, etc). With this feature you can implement a clean-up mechanism causing the worker to shut down automatically. ## Choosing what to do in case of bad requests This crate assumes the client and server share a request / response type and use the same serialization format and compression setting (both enabled or disabled). Versioning of payloads is left as a responsibility of the user. This can cause the situation in which a client uses a different type or compression setting and the server is unable to deserialize the request (bad request). By default, this crate returns an empty message to the client and logs a [`tracing`](https://docs.rs/tracing/) error. You can customize the behavior using the [`TCPServer::with_bad_request_handler`] method to set a handler for that case: ```rust #[derive(Serialize, Deserialize)] enum Response { World, BadRequest, } TCPServer::new("127.0.0.1", 12345, echo_handler) // This will be called when a bad request happens. // In this example we return a BadRequest message to the client. .with_bad_request_handler(|| TCPResponse::Message(Response::BadRequest)) .listen() .await; ``` ## Customizing serialization `tcp_message_io` tries to make it as easy as possible to get started by using sane defaults: enable the `postcard` feature and any `Serialize` and `Deserialize` type will work as request or response, enable `zstd` for transparent compression of the payload. If you want to customize the serialization method, you can disable the `postcard` feature and implement the [`SerializeMessage`] trait for your message types: ```rust #[derive(Serialize, Deserialize)] enum Response { World, } impl SerializeMessage for Response { fn serialize(&self) -> Result> { // Implement serialization to bytes } fn deserialize(message: &[u8]) -> Result { // Implement deserialization from bytes } } ``` If you want to use another compression method, disable `zstd` and implement your compression method in the `serialize` and `deserialize` methods above. ## Wire format The wire format used by the library is the message plus an internal 8-byte header encoding the length of each message.