use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use futures_util::{SinkExt, StreamExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{accept_async, connect_async, MaybeTlsStream, WebSocketStream}; use wheel_timer2::{Behave, MultiWheel}; const PRECISION: Duration = Duration::from_millis(100); #[tokio::main(flavor = "current_thread")] async fn main() { let mut wheel = MultiWheel::new(10, 10, PRECISION); let add_handle = wheel.add_handle(); let addr = server().await; for (id, dur) in [ Duration::from_secs(1), Duration::from_secs(3), Duration::from_secs(6), ] .iter() .enumerate() { let ws_stream = Arc::new(Mutex::new(client(addr).await)); add_handle .add_with_ctx( move |ws_stream: &mut Arc>>>| { let ws_stream = ws_stream.clone(); tokio::spawn(async move { let mut ws_stream = ws_stream.lock().await; ws_stream .send(Message::Ping(id.to_be_bytes().to_vec())) .await .unwrap(); assert!(ws_stream.next().await.unwrap().unwrap().is_pong()); println!("pong: {}", id); }); Behave::Repeat }, *dur, ws_stream, ) .unwrap(); } let mut i = tokio::time::interval(PRECISION); loop { i.tick().await; wheel.tick(); } } async fn server() -> SocketAddr { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(accept_connection(listener)); addr } async fn accept_connection(listener: TcpListener) { while let Ok((stream, _)) = listener.accept().await { tokio::spawn(handle_connection(stream)); } } async fn handle_connection(stream: TcpStream) { let mut ws_stream = accept_async(stream).await.expect("Failed to accept"); while let Some(msg) = ws_stream.next().await { let msg = msg.unwrap(); if let Message::Ping(b) = msg { let id = usize::from_be_bytes((&*b).try_into().unwrap()); println!("ping: {}", id); ws_stream.send(Message::Pong(b)).await.unwrap(); } } } async fn client(addr: SocketAddr) -> WebSocketStream> { connect_async(format!("ws://{}", addr)).await.unwrap().0 }