use async_dup::{Arc, Mutex}; use futures::Stream; use smol::Async; use std::net::TcpListener; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use tophat::server::accept; fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let ping_machine = Arc::new(Mutex::new(PingMachine { broadcasters: Vec::new(), })); let listener = Async::::bind(([127,0,0,1],9999))?; let ping_task = smol::spawn({ let ping_machine = ping_machine.clone(); async move { loop { ping_machine.lock().ping().await; smol::Timer::after(Duration::from_secs(1)).await; } } }); ping_task.detach(); smol::block_on(async { loop { let (stream, _) = listener.accept().await?; let stream = Arc::new(stream); let ping_machine = ping_machine.clone(); let task = smol::spawn(async move { let serve = accept(stream, |_req, mut resp_wtr| async { let client = ping_machine.lock().add_client(); resp_wtr.set_sse(client); resp_wtr.send().await }) .await; if let Err(err) = serve { eprintln!("Error: {}", err); } }); task.detach(); } }) } struct PingMachine { broadcasters: Vec>, } impl PingMachine { async fn ping(&self) { for tx in &self.broadcasters { let _ = tx.send("data: ping\n\n".to_owned()).await; } } fn add_client(&mut self) -> Client { let (tx, rx) = async_channel::bounded(10); self.broadcasters.push(tx); Client(rx) } } struct Client(async_channel::Receiver); impl Stream for Client { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.0).poll_next(cx) { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } }