tcp_message_io

Crates.iotcp_message_io
lib.rstcp_message_io
version1.0.4
sourcesrc
created_at2023-11-19 17:09:45.024453
updated_at2023-12-02 16:25:37.715269
descriptionA simple TCP server and client implementation to exchange messages
homepage
repository
max_upload_size
id1041355
size41,884
Francesco Pasa (frapa)

documentation

README

TCP message I/O

A simple TCP client/server implementation for tokio. The library contains two abstraction levels:

  • High-level interface: allows exchanging Rust types using serde.
  • Low-level interface: allows exchanging Vec<u8> messages.

This page describes the high-level abstraction, for the low-level one check the [raw] submodule.

Goals & non-goals

  • Hide complexity of listening and accepting TCP connections, and turning a TCP stream into a request/response stream.
  • Serialize and deserialize messages into Rust types.
  • Customizable serialization & compression.
  • Transparent compression using zstd.
  • Flexibility: use convenience features or the raw interface.

This library intentionally leaves most error handling to the user.

Cargo features

By default, no feature is enabled. Available features:

  • postcard - Enable automatic serialization using postcard (a fast and efficient serialization format).
  • zstd - Enable transparent compression of messages using zstd.

We recommend enabling both for maximum simplicity.

Note that both client and server must use the same features, otherwise they won't be able to understand each other messages.

Client

Client usage is straightforward:

use tcp_message_io::TCPClient;
use tokio;
use serde::{Deserialize, Serialize};

// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
  Hello,
}

// This type represents the responses from the server.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
enum Response {
  World,
}

#[tokio::main]
async fn main() {
  // We need to specify the response type so that tcp_message_io
  // knows what object to use for response deserialization.
  let client = TCPClient::<_, Response>::connect("127.0.0.1", 12345).await.unwrap();
  let response = client.send(Request::Hello).await.unwrap();
  assert_eq!(response, Some(Response::World));
}

Server

Creating a server is very straightforward as well:

use anyhow::Result;
use tcp_message_io::{TCPServer, TCPResponse};
use tokio;
use serde::{Deserialize, Serialize};

// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
  Hello,
}

// This type represents the responses from the server.
#[derive(Serialize, Deserialize)]
enum Response {
  World,
}

// We need a request handler: in this case we implement a simple
// "Hello, World" handler.
async fn hello_world_handler(request: Request) -> Result<TCPResponse<Response>> {
  match request {
    Request::Hello => Ok(TCPResponse::Message(Response::World))
    // Handle additional request types here!
  }
}

#[tokio::main]
async fn main() {
    TCPServer::new("127.0.0.1", 12345, hello_world_handler).listen().await;
}

[TCPResponse] can be one of the following:

  • TCPResponse::Message(response) to send a response message.
  • TCPResponse::CloseConnection to close the connection with the client. This will also send an empty response to the client.
  • TCPResponse::StopServer to shut the server down. This will also send an empty response to the client and close the connection.

The library relies on anyhow for error handling, enabling the handler to return errors of any type.

If the handler returns an error, the client recives an empty message, a tracing error message will be logged and the server keeps listening for new messages from the same client. This mechanism is meant for unhandled errors, and avoids leaving the client hanging for a response.

It's left as a responsibility of the user to build an error reporting mechanism on top of the transform if required. For example, this can be achieved by ensuring the handler always returns Ok(...), and errors are send back as an enum variant.

Stopping the TCP server after a timeout

Additionally, this crate supports stopping the server after a certain amount of inactivity (inactivity timeout):

TCPServer::new("127.0.0.1", 12345, echo_handler)
    .with_inactivity_timeout(60)  // Seconds 
    .listen()
    .await;

This feature is useful when building something like a worker node: a node might be orphaned for many reasons (network issues, master crashing, etc). With this feature you can implement a clean-up mechanism causing the worker to shut down automatically.

Choosing what to do in case of bad requests

This crate assumes the client and server share a request / response type and use the same serialization format and compression setting (both enabled or disabled). Versioning of payloads is left as a responsibility of the user.

This can cause the situation in which a client uses a different type or compression setting and the server is unable to deserialize the request (bad request). By default, this crate returns an empty message to the client and logs a tracing error.

You can customize the behavior using the [TCPServer::with_bad_request_handler] method to set a handler for that case:

#[derive(Serialize, Deserialize)]
enum Response {
  World,
  BadRequest,
}

TCPServer::new("127.0.0.1", 12345, echo_handler)
    // This will be called when a bad request happens.
    // In this example we return a BadRequest message to the client.
    .with_bad_request_handler(|| TCPResponse::Message(Response::BadRequest)) 
    .listen()
    .await;

Customizing serialization

tcp_message_io tries to make it as easy as possible to get started by using sane defaults: enable the postcard feature and any Serialize and Deserialize type will work as request or response, enable zstd for transparent compression of the payload.

If you want to customize the serialization method, you can disable the postcard feature and implement the [SerializeMessage] trait for your message types:

#[derive(Serialize, Deserialize)]
enum Response {
  World,
}

impl SerializeMessage for Response {
  fn serialize(&self) -> Result<Vec<u8>> {
    // Implement serialization to bytes
  }
  fn deserialize(message: &[u8]) -> Result<Self> {
    // Implement deserialization from bytes
  }
}

If you want to use another compression method, disable zstd and implement your compression method in the serialize and deserialize methods above.

Wire format

The wire format used by the library is the message plus an internal 8-byte header encoding the length of each message.

Commit count: 0

cargo fmt