Crates.io | boomnet |
lib.rs | boomnet |
version | 0.0.25 |
source | src |
created_at | 2024-03-14 16:44:52.213302 |
updated_at | 2024-05-15 17:10:30.783295 |
description | Framework for building low latency websocket client based applications. |
homepage | |
repository | https://github.com/HaveFunTrading/boomnet |
max_upload_size | |
id | 1173539 |
size | 323,121 |
BoomNet is a high-performance framework targeting development of low-latency network applications, particularly focusing on TCP stream-oriented clients that utilise various protocols.
Simply declare dependency on boomnet
in your Cargo.toml
and select desired features.
[dependencies]
boomnet = { version = "0.0.25", features = ["full"]}
The framework is structured into multiple layers, with each subsequent layer building upon its predecessor, enhancing functionality and abstraction.
The first layer defines stream
as abstraction over TCP connection, adhering to the following characteristics.
Read
and Write
traits for I/O operations.rustls
.Streams are designed to be fully generic, avoiding dynamic dispatch, and can be composed in flexible way.
let stream: RecordedStream<TlsStream<TcpStream>> = TcpStream::bind_and_connect(addr, self.net_iface, None)?
.into_tls_stream(self.url)
.into_recorded_stream("plain");
Different protocols can then be applied on top of a stream.
let ws: Websocket<RecordedStream<TlsStream<TcpStream>>> = stream.into_websocket(self.url);
Selector
provides abstraction over OS specific mechanisms (like epoll
) for efficiently monitoring socket readiness events.
Though primarily utilised internally, selectors are crucial for the IOService
functionality, currently offering both
mio
and direct
(no-op) implementations.
let mut io_service = MioSelector::new()?.into_io_service(IdleStrategy::Sleep(Duration::from_millis(1)));
The last layer manages lifecycle of endpoints and provides auxiliary services (such as asynchronous DNS resolution and
auto disconnect) through the IOService
.
Endpoint
serves as low level construct for application logic. IOService
oversees the connection lifecycle within endpoints.
The aim is to support a variety of protocols, including WebSocket, HTTP, and FIX, with WebSocket client functionality currently available.
The websocket client protocol complies with the RFC 6455 specification, offering the following features.
Selector
and IOService
.The repository contains comprehensive list of examples.
The following example illustrates how to use multiple websocket connections with IOService
in order to consume messages from the Binance cryptocurrency
exchange. First, we need to define and implement our Endpoint
. The framework provides TlsWebsocketEndpoint
trait
that we can use.
#[derive(Default)]
struct TradeEndpoint {
id: u32,
url: &'static str,
instrument: &'static str,
}
impl TradeEndpoint {
pub fn new(id: u32, url: &'static str, instrument: &'static str) -> TradeEndpoint {
Self { id, url, instrument, }
}
}
impl TlsWebsocketEndpoint for TradeEndpoint {
type Stream = MioStream;
fn url(&self) -> &str {
self.url
}
// called by the IO service whenever a connection has to be established for this endpoint
fn create_websocket(&mut self, addr: SocketAddr) -> io::Result<TlsWebsocket<Self::Stream>> {
// create secure websocket
let mut ws = TcpStream::bind_and_connect(addr, None, None)?
.into_mio_stream()
.into_tls_websocket(self.url);
// send subscription message
ws.send_text(
true,
Some(format!(r#"{{"method":"SUBSCRIBE","params":["{}@trade"],"id":1}}"#, self.instrument).as_bytes()),
)?;
Ok(ws)
}
#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
// keep calling receive_next until no more frames in the current batch
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
// handle the message
println!("[{}] {ts}: ({fin}) {}", self.id, String::from_utf8_lossy(data));
}
Ok(())
}
}
After defining the endpoint, it is registered with the IOService
and polled within an event loop. The service handles
Endpoint
connection management and reconnection in case of disconnection.
fn main() -> anyhow::Result<()> {
let mut io_service = MioSelector::new()?.into_io_service(IdleStrategy::Sleep(Duration::from_millis(1)));
let endpoint_btc = TradeEndpoint::new(0, "wss://stream1.binance.com:443/ws", None, "btcusdt");
let endpoint_eth = TradeEndpoint::new(1, "wss://stream2.binance.com:443/ws", None, "ethusdt");
let endpoint_xrp = TradeEndpoint::new(2, "wss://stream3.binance.com:443/ws", None, "xrpusdt");
io_service.register(endpoint_btc);
io_service.register(endpoint_eth);
io_service.register(endpoint_xrp);
loop {
// will never block
io_service.poll()?;
}
}
It is often required to expose shared state to the Endpoint
. This can be achieved with user defined Context
.
struct FeedContext {
static_data: StaticData,
}
// use the marker trait
impl Context for FeedContext {}
When implementing our TradeEndpoint
we can use TlsWebsocketEndpointWithContext
trait instead.
impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {
type Stream = MioStream;
fn url(&self) -> &str {
self.url
}
fn create_websocket(&mut self, addr: SocketAddr, ctx: &mut FeedContext) -> io::Result<TlsWebsocket<Self::Stream>> {
// we now have access to context
}
#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, ctx: &mut FeedContext) -> io::Result<()> {
// we now have access to context
}
}
We will also need to create IOService
that is Context
aware.
let mut context = FeedContext::new(static_data);
let mut io_service = MioSelector::new()?.into_io_service_with_context(IdleStrategy::Sleep(Duration::from_millis(1)), &mut context);
The Context
must now be passed to the service poll
method.
loop {
io_service.poll(&mut context)?;
}
BoomNet feature set is modular, allowing for tailored functionality based on project needs. The full
feature enables
all available features, while individual components can be enabled as needed.
mio
Adds dependency on mio
crate and enables MioSelector
and MioStream
.
tls
Adds dependency on rustls
crate and enables TlsStream
and more flexible TlsReadyStream
.
ws
Adds support for Websocket
protocol.