Crates.io | stream-reconnect |
lib.rs | stream-reconnect |
version | 0.4.0-beta.4 |
source | src |
created_at | 2021-07-14 13:58:08.39599 |
updated_at | 2022-11-11 01:15:33.546495 |
description | Stream-wrapping traits/structs that automatically recover from potential disconnections/interruptions. |
homepage | |
repository | https://github.com/PhotonQuantum/stream-reconnect |
max_upload_size | |
id | 422748 |
size | 45,450 |
This crate provides a Stream
/Sink
-wrapping struct that automatically recover from potential
disconnections/interruptions.
This is a fork of stubborn-io, which is built for the same purpose but
for AsyncRead
/AsyncWrite
.
To use with your project, add the following to your Cargo.toml:
stream-reconnect = "0.3"
Minimum supported rust version: 1.43.1
This crate supports both tokio
and async-std
runtime.
tokio
support is enabled by default. While used on an async-std
runtime, change the corresponding dependency
in Cargo.toml
to
stream-reconnect = { version = "0.3", default-features = false, features = ["async-std"] }
not-send
- allow the establish function to be non thread-safe.
In this example, we will see a drop in replacement for tungstenite's WebSocketStream, with the distinction that it will automatically attempt to reconnect in the face of connectivity failures.
use stream_reconnect::{UnderlyingStream, ReconnectStream};
use std::future::Future;
use std::io;
use std::pin::Pin;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::{Message, error::Error as WsError};
use futures::{SinkExt, Stream, Sink};
struct MyWs;
impl UnderlyingStream<String, Result<Message, WsError>, WsError> for MyWs {
type Stream = WebSocketStream<MaybeTlsStream<TcpStream>>;
// Establishes connection.
// Additionally, this will be used when reconnect tries are attempted.
fn establish(addr: String) -> Pin<Box<dyn Future<Output = Result<Self::Stream, WsError>> + Send>> {
Box::pin(async move {
// In this case, we are trying to connect to the WebSocket endpoint
let ws_connection = connect_async(addr).await.unwrap().0;
Ok(ws_connection)
})
}
// The following errors are considered disconnect errors.
fn is_write_disconnect_error(err: &WsError) -> bool {
matches!(
err,
WsError::ConnectionClosed
| WsError::AlreadyClosed
| WsError::Io(_)
| WsError::Tls(_)
| WsError::Protocol(_)
)
}
// If an `Err` is read, then there might be an disconnection.
fn is_read_disconnect_error(item: &Result<Message, WsError>) -> bool {
if let Err(e) = item {
Self::is_write_disconnect_error(e)
} else {
false
}
}
// Return "Exhausted" if all retry attempts are failed.
fn exhaust_err() -> WsError {
WsError::Io(io::Error::new(io::ErrorKind::Other, "Exhausted"))
}
}
type ReconnectWs = ReconnectStream<MyWs, String, Result<Message, WsError>, WsError>;
let mut ws_stream = ReconnectWs::connect(String::from("ws://localhost:8000")).await.unwrap();
ws_stream.send("hello world!".into()).await.unwrap();
MIT