| Crates.io | io-tether |
| lib.rs | io-tether |
| version | 0.6.1 |
| created_at | 2024-06-14 08:36:58.539111+00 |
| updated_at | 2025-07-28 21:33:24.675229+00 |
| description | A small library for defining I/O types which reconnect on errors. |
| homepage | |
| repository | https://github.com/cmleinz/io-tether |
| max_upload_size | |
| id | 1271762 |
| size | 73,393 |
A small library for defining I/O types which reconnect on errors.
To get started, add io-tether to your list of dependencies
io-tether = { version = "0.6.1" }
The primary type exposed by this library is the Tether type. This
type is generic over two parameters:
C: The I/O connector. This is the type which produces the
underlying connections. For some io types like QUIC this may
need to be fairly involved, while for io like TCP, it may just
be a wrapper around a socket address
R: The resolver. This type will likely be generated by you in
order to handle the buisness logic required for your application
whenever a disconnect occurs. It drives the reconnect process and
allows developers to inject arbirtary asynchronous code at various
stages of the reconnection process
Below is a simple example of a resolver implmentation that calls back to a channel whenever it detects a disconnect.
use std::{time::Duration, net::{SocketAddrV4, Ipv4Addr}};
use io_tether::{Action, Resolver, Context, Reason, Tether, PinFut, tcp::TcpConnector};
use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::mpsc};
pub struct ChannelResolver(mpsc::Sender<String>);
type Connector = TcpConnector<SocketAddrV4>;
// NOTE: If you don't need to act on the connector, this can be implemented for generic `C`
impl Resolver<Connector> for ChannelResolver {
fn disconnected(&mut self, context: &Context, conn: &mut Connector) -> PinFut<Action> {
let sender = self.0.clone();
let reason = context.reason().to_string();
// Try port 8081 when retrying
conn.get_addr_mut().set_port(8081);
Box::pin(async move {
// Send the disconnect reason over the channel
sender.send(reason).await.unwrap();
// We can call arbirtary async code here
tokio::time::sleep(Duration::from_millis(500)).await;
Action::AttemptReconnect
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel(1);
let resolver = ChannelResolver(tx);
let listener_1 = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
let listener_2 = tokio::net::TcpListener::bind("0.0.0.0:8081").await?;
// Each listener, only accepts 1 connection, writing half of "foobar"
tokio::spawn(async move {
let (mut stream, _addr) = listener_1.accept().await.unwrap();
stream.write_all(b"foo").await.unwrap();
});
tokio::spawn(async move {
let (mut stream, _addr) = listener_2.accept().await.unwrap();
stream.write_all(b"bar").await.unwrap();
});
let handle = tokio::spawn(async move {
// Start by connecting to port 8080
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080);
let mut tether = Tether::connect_tcp(addr, resolver)
.await
.unwrap();
let mut buf = [0; 6];
// A disconnect occurs here after the server writes
// "foo" then drops the client, triggering a disconnect.
// The disconnect is detected and forwarded to the resolver,
// which adjusts the port, sleeps and attempts a reconnect.
//
// The resolver then connects to the new remote socket and we
// pull the next 3 bytes. This all happens under the hood
// without any extra work at each read callsite.
tether.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"foobar");
});
// Since a disconnect occurred during the call to read_exact,
// the channel will contain the disconnect reason
assert!(rx.recv().await.is_some());
handle.await?;
Ok(())
}
stubborn-io similar, but uses synchronous callbacks and a duration iterator for retries
tokio-retry a more general purpose future retry library