//!
use std::pin::Pin;
use futures::{stream::SplitSink, StreamExt};
use futures_util::{SinkExt, Stream};
use serde::Serialize;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use crate::{
api::subscribe::BookEvent,
types::{Request, Result},
};
pub const DEFAULT_WS_BASE_URL: &str = "wss://ws.bitstamp.net";
/// A WebSocket client for Bitstamp.
pub struct Client {
sender: SplitSink>, Message>,
// The thread_handle will be dropped when the Client drops.
#[allow(dead_code)]
thread_handle: tokio::task::JoinHandle<()>,
pub broadcast: Option>,
pub book_events: Option + Send + Sync>>>,
}
impl Client {
pub async fn connect(url: &str) -> Result {
let (stream, _) = connect_async(url).await?;
let (sender, receiver) = stream.split();
let (broadcast_sender, _) = tokio::sync::broadcast::channel::(32);
let broadcast = broadcast_sender.clone();
let thread_handle = tokio::spawn(async move {
let mut receiver = receiver;
while let Some(result) = receiver.next().await {
if let Ok(msg) = result {
if let Message::Text(string) = msg {
tracing::debug!("{string}");
if let Err(err) = broadcast_sender.send(string) {
tracing::trace!("{err:?}");
// Break the while loop so that the receiver handle is dropped
// and the task unsubscribes from the summary stream.
break;
}
}
} else {
tracing::error!("{:?}", result);
}
}
});
Ok(Self {
sender,
thread_handle,
broadcast: Some(broadcast),
book_events: None,
})
}
pub async fn connect_public() -> Result {
let url = format!("{DEFAULT_WS_BASE_URL}/");
Self::connect(&url).await
}
/// Sends a message to the WebSocket.
pub async fn send(&mut self, req: R) -> Result<()>
where
R: Serialize,
{
let msg = serde_json::to_string(&req).unwrap();
tracing::debug!("{msg}");
self.sender.send(Message::Text(msg.to_string())).await?;
Ok(())
}
/// Performs a remote procedure call.
pub async fn call(&mut self, event: impl Into, data: D) -> Result<()>
where
D: Serialize,
{
let req = Request {
event: event.into(),
data,
};
self.send(req).await
}
}