use std::net::ToSocketAddrs; use bytes::{Buf, BytesMut}; use futures::prelude::*; use futures::sink::SinkExt; use tokio::net::TcpStream; use tokio_util::codec::{Decoder, Encoder, Framed}; pub type ClientTransport = Framed; use crate::frame; use crate::{FromServer, Message, Result, ToServer}; use anyhow::{anyhow, bail}; /// Connect to a STOMP server via TCP, including the connection handshake. /// If successful, returns a tuple of a message stream and a sender, /// which may be used to receive and send messages respectively. pub async fn connect( address: impl Into, login: Option, passcode: Option, ) -> Result { let address = address.into(); let addr = address.as_str().to_socket_addrs().unwrap().next().unwrap(); let tcp = TcpStream::connect(&addr).await?; let mut transport = ClientCodec.framed(tcp); client_handshake(&mut transport, address, login, passcode).await?; Ok(transport) } async fn client_handshake( transport: &mut ClientTransport, host: String, login: Option, passcode: Option, ) -> Result<()> { let connect = Message { content: ToServer::Connect { accept_version: "1.2".into(), host, login, passcode, heartbeat: None, }, extra_headers: vec![], }; // Send the message transport.send(connect).await?; // Receive reply let msg = transport.next().await.transpose()?; if let Some(FromServer::Connected { .. }) = msg.as_ref().map(|m| &m.content) { Ok(()) } else { Err(anyhow!("unexpected reply: {:?}", msg)) } } /// Convenience function to build a Subscribe message pub fn subscribe(dest: impl Into, id: impl Into) -> Message { ToServer::Subscribe { destination: dest.into(), id: id.into(), ack: None, } .into() } pub struct ClientCodec; impl Decoder for ClientCodec { type Item = Message; type Error = anyhow::Error; fn decode(&mut self, src: &mut BytesMut) -> Result> { let (item, offset) = match frame::parse_frame(src) { Ok((remain, frame)) => ( Message::::from_frame(frame), remain.as_ptr() as usize - src.as_ptr() as usize, ), Err(nom::Err::Incomplete(_)) => return Ok(None), Err(e) => bail!("Parse failed: {:?}", e), }; src.advance(offset); item.map(Some) } } impl Encoder> for ClientCodec { type Error = anyhow::Error; fn encode( &mut self, item: Message, dst: &mut BytesMut, ) -> std::result::Result<(), Self::Error> { item.to_frame().serialize(dst); Ok(()) } }