Crates.io | tsyncp |
lib.rs | tsyncp |
version | 0.4.1 |
source | src |
created_at | 2022-04-15 06:48:30.829762 |
updated_at | 2024-06-22 23:42:43.285737 |
description | Async channel APIs (mpsc, broadcast, barrier, etc) over TCP for message-passing. |
homepage | https://github.com/PoOnesNerfect/tsyncp |
repository | https://github.com/PoOnesNerfect/tsyncp |
max_upload_size | |
id | 568271 |
size | 506,018 |
Major rust libraries such as std and tokio provide great synchronization primitives for message-passing between threads and tasks. However, there are not many libraries that provide similar APIs that can be used over the network.
Tsyncp tries to fill the gap by providing the similar APIs (mpsc, broadcast, barrier, etc) over TCP. If you have a personal project where it only has a few services running, and they need to pass some data to each other; instead of setting up a message-broker service, you can use tsyncp to easily pass data between them.
Tsyncp also allows customizing different Serialization/Deserialization methods to encode/decode data; currently, supported schemes straight from the library are Json, Protobuf, and Bincode; however, users can very easily implement their own EncodeMethod and DecodeMethod.
Currently, tsyncp provides 5 types of channels:
Goal of the project is to provide simple, intuitive but extendable primitives to pass data over the network. That's why this library uses Future-chaining extensively.
Getting started is as easy as:
use color_eyre::Result;
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut rx: mpsc::JsonReceiver<Dummy> = mpsc::receiver_on("localhost:11114").await?;
// accept a new connection coming from a sender application.
rx.accept().await?;
// after accepting connection, you can start receiving data from the receiver.
if let Some(Ok(item)) = rx.recv().await {
// below line is to show the type of received item.
let item: Dummy = item;
println!("received item: {item:?}");
}
Ok(())
}
But you can easily extend it by chaining futures as below:
use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut rx: mpsc::JsonReceiver<Dummy> = mpsc::receiver_on("localhost:11114")
.limit(10) // limit allowed connections to 10.
.set_tcp_reuseaddr(true) // set tcp config reuseaddr to `true`.
.accept() // accept connection. (default: 1)
.to_limit() // accept until limit is reached. (10)
.handle(|a| println!("{a} connected!")) // print address when a connection is accepted.
.await?;
// At this point, the receiver has 10 connections in the connection pool,
// which all have `reuseaddr` as `true`.
while let Some(Ok((item, addr))) = rx.recv().with_addr().await {
println!("received item: {item:?} from {addr}");
}
Ok(())
}
I just vomited a whole bunch of chains, but you can just use any chains that fits your neck.
use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut tx: mpsc::JsonSender<Dummy> = mpsc::sender_to("localhost:11114").await?;
let dummy = Dummy {
field1: String::from("hello world"),
field2: 1234567,
field3: vec![1, 2, 3, 4]
};
tx.send(dummy).await?;
Ok(())
}
But you can also extend it by chaining the futures as:
use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use std::time::Duration;
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut tx: mpsc::JsonSender<Dummy> = mpsc::sender_to("localhost:11114")
.retry(Duration::from_millis(500), 100) // retry connecting 100 times every 500 ms.
.set_tcp_reuseaddr(true) // set tcp config reuseaddr to `true`.
.await?;
let dummy = Dummy {
field1: String::from("hello world"),
field2: 1234567,
field3: vec![1, 2, 3, 4]
};
// send some item.
tx.send(dummy).await?;
Ok(())
}
The API documentation has a very detailed guide on how to use the primitives. So please check them out!
If enough people find this library useful or interesting, I will work primarily on the following:
Note: Tsyncp is built on tokio; and thus may not be compatible with other async runtimes like async-std.
Note: If you're worried about the quality of implementation and skills of the author, you shouldn't worry! (too much...) This library is a relatively thin layer on top of tokio::net::TcpStream for TCP, tokio_util::codec::Framed for byte stream framing, and serde, serde_json, Prost and [bincode]. for serializing byte data! You can think of this library as a salad bowl made from the best ingredients in town and a light touch of amateur homemade sauces.
That being said, I did add a couple of my own sauces into it. The two flavours that I added are:
You can take a look at the implementations of these, which both are pretty simple. And I tried my best not to put too much of my personal opinions into it!
But still, since this library is still a baby, if you find any concerns in the code, please contact me at jack.y.l.dev@gmail.com!
Warning: Tsyncp is not a message-broker nor it tries to be; it's just a message-passing library for simple and convenient use cases.
Warning: Tsyncp is still WIP! It's very usable, but still needs some encode/decode features implemented, extensive testing, documentations, and examples.