Crates.io | tcp_message_io |
lib.rs | tcp_message_io |
version | 1.0.4 |
source | src |
created_at | 2023-11-19 17:09:45.024453 |
updated_at | 2023-12-02 16:25:37.715269 |
description | A simple TCP server and client implementation to exchange messages |
homepage | |
repository | |
max_upload_size | |
id | 1041355 |
size | 41,884 |
A simple TCP client/server implementation for tokio
.
The library contains two abstraction levels:
serde
.Vec<u8>
messages.This page describes the high-level abstraction, for the low-level one
check the [raw
] submodule.
zstd
.This library intentionally leaves most error handling to the user.
features
By default, no feature is enabled. Available features:
postcard
- Enable automatic serialization using postcard
(a fast and efficient serialization format).zstd
- Enable transparent compression of messages using zstd
.We recommend enabling both for maximum simplicity.
Note that both client and server must use the same features, otherwise they won't be able to understand each other messages.
Client usage is straightforward:
use tcp_message_io::TCPClient;
use tokio;
use serde::{Deserialize, Serialize};
// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
Hello,
}
// This type represents the responses from the server.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
enum Response {
World,
}
#[tokio::main]
async fn main() {
// We need to specify the response type so that tcp_message_io
// knows what object to use for response deserialization.
let client = TCPClient::<_, Response>::connect("127.0.0.1", 12345).await.unwrap();
let response = client.send(Request::Hello).await.unwrap();
assert_eq!(response, Some(Response::World));
}
Creating a server is very straightforward as well:
use anyhow::Result;
use tcp_message_io::{TCPServer, TCPResponse};
use tokio;
use serde::{Deserialize, Serialize};
// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
Hello,
}
// This type represents the responses from the server.
#[derive(Serialize, Deserialize)]
enum Response {
World,
}
// We need a request handler: in this case we implement a simple
// "Hello, World" handler.
async fn hello_world_handler(request: Request) -> Result<TCPResponse<Response>> {
match request {
Request::Hello => Ok(TCPResponse::Message(Response::World))
// Handle additional request types here!
}
}
#[tokio::main]
async fn main() {
TCPServer::new("127.0.0.1", 12345, hello_world_handler).listen().await;
}
[TCPResponse
] can be one of the following:
TCPResponse::Message(response)
to send a response message.TCPResponse::CloseConnection
to close the connection with the client.
This will also send an empty response to the client.TCPResponse::StopServer
to shut the server down.
This will also send an empty response to the client and close the connection.The library relies on anyhow
for
error handling, enabling the handler to return errors of any type.
If the handler returns an error, the client recives an empty message,
a tracing
error message will be logged
and the server keeps listening for new messages from the same client.
This mechanism is meant for unhandled errors, and avoids leaving
the client hanging for a response.
It's left as a responsibility of the user to build an error reporting
mechanism on top of the transform if required. For example, this can
be achieved by ensuring the handler always returns Ok(...)
, and
errors are send back as an enum variant.
Additionally, this crate supports stopping the server after a certain amount of inactivity (inactivity timeout):
TCPServer::new("127.0.0.1", 12345, echo_handler)
.with_inactivity_timeout(60) // Seconds
.listen()
.await;
This feature is useful when building something like a worker node: a node might be orphaned for many reasons (network issues, master crashing, etc). With this feature you can implement a clean-up mechanism causing the worker to shut down automatically.
This crate assumes the client and server share a request / response type and use the same serialization format and compression setting (both enabled or disabled). Versioning of payloads is left as a responsibility of the user.
This can cause the situation in which a client uses a different
type or compression setting and the server is unable to deserialize
the request (bad request). By default, this crate returns an
empty message to the client and logs a
tracing
error.
You can customize the behavior using the [TCPServer::with_bad_request_handler
]
method to set a handler for that case:
#[derive(Serialize, Deserialize)]
enum Response {
World,
BadRequest,
}
TCPServer::new("127.0.0.1", 12345, echo_handler)
// This will be called when a bad request happens.
// In this example we return a BadRequest message to the client.
.with_bad_request_handler(|| TCPResponse::Message(Response::BadRequest))
.listen()
.await;
tcp_message_io
tries to make it as easy as possible to get started
by using sane defaults: enable the postcard
feature and any Serialize
and Deserialize
type will work as request or response, enable zstd
for transparent compression of the payload.
If you want to customize the serialization method, you can disable
the postcard
feature and implement the [SerializeMessage
] trait
for your message types:
#[derive(Serialize, Deserialize)]
enum Response {
World,
}
impl SerializeMessage for Response {
fn serialize(&self) -> Result<Vec<u8>> {
// Implement serialization to bytes
}
fn deserialize(message: &[u8]) -> Result<Self> {
// Implement deserialization from bytes
}
}
If you want to use another compression method, disable zstd
and
implement your compression method in the serialize
and deserialize
methods above.
The wire format used by the library is the message plus an internal 8-byte header encoding the length of each message.