#![deny(warnings)] use std::{ collections::HashMap, sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, }, }; use futures::{future, Future, FutureExt, StreamExt}; use rweb::{ ws::{Message, WebSocket}, Filter, }; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; /// Our global unique user id counter. static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); /// Our state of currently connected users. /// /// - Key is their id /// - Value is a sender of `rweb::ws::Message` type Users = Arc>>>>; #[tokio::main] async fn main() { pretty_env_logger::init(); // Keep track of all connected users, key is usize, value // is a websocket sender. let users = Arc::new(Mutex::new(HashMap::new())); // Turn our "state" into a new Filter... let users = rweb::any().map(move || users.clone()); // GET /chat -> websocket upgrade let chat = rweb::path("chat") // The `ws()` filter will prepare Websocket handshake... .and(rweb::ws()) .and(users) .map(|ws: rweb::ws::Ws, users| { // This will call our function if the handshake succeeds. ws.on_upgrade(move |socket| user_connected(socket, users).map(|result| result.unwrap())) }); // GET / -> index html let index = rweb::path::end().map(|| rweb::reply::html(INDEX_HTML)); let routes = index.or(chat); rweb::serve(routes).run(([127, 0, 0, 1], 3030)).await; } fn user_connected(ws: WebSocket, users: Users) -> impl Future> { // Use a counter to assign a new unique ID for this user. let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); eprintln!("new chat user: {}", my_id); // Split the socket into a sender and receive of messages. let (user_ws_tx, user_ws_rx) = ws.split(); // Use an unbounded channel to handle buffering and flushing of messages // to the websocket... let (tx, rx) = mpsc::unbounded_channel(); tokio::task::spawn( UnboundedReceiverStream::new(rx) .forward(user_ws_tx) .map(|result| { if let Err(e) = result { eprintln!("websocket send error: {}", e); } }), ); // Save the sender in our list of connected users. users.lock().unwrap().insert(my_id, tx); // Return a `Future` that is basically a state machine managing // this specific user's connection. // Make an extra clone to give to our disconnection handler... let users2 = users.clone(); user_ws_rx // Every time the user sends a message, broadcast it to // all other users... .for_each(move |msg| { user_message(my_id, msg.unwrap(), &users); future::ready(()) }) // for_each will keep processing as long as the user stays // connected. Once they disconnect, then... .then(move |result| { user_disconnected(my_id, &users2); future::ok(result) }) // If at any time, there was a websocket error, log here... // .map_err(move |e| { // eprintln!("websocket error(uid={}): {}", my_id, e); // }) } fn user_message(my_id: usize, msg: Message, users: &Users) { // Skip any non-Text messages... let msg = if let Ok(s) = msg.to_str() { s } else { return; }; let new_msg = format!(": {}", my_id, msg); // New message from this user, send it to everyone else (except same uid)... // // We use `retain` instead of a for loop so that we can reap any user that // appears to have disconnected. for (&uid, tx) in users.lock().unwrap().iter_mut() { if my_id != uid { match tx.send(Ok(Message::text(new_msg.clone()))) { Ok(()) => (), Err(_disconnected) => { // The tx is disconnected, our `user_disconnected` code // should be happening in another task, nothing more to // do here. } } } } } fn user_disconnected(my_id: usize, users: &Users) { eprintln!("good bye user: {}", my_id); // Stream closed up, so remove from the user list users.lock().unwrap().remove(&my_id); } static INDEX_HTML: &str = r#" Warp Chat

warp chat

Connecting...

"#;