use futures_util::StreamExt; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; use warp::ws::{WebSocket, Ws}; use warp::{Filter, Rejection, Reply}; use yrs::sync::Awareness; use yrs::{Doc, Text, Transact}; use yrs_warp::broadcast::BroadcastGroup; use yrs_warp::ws::{WarpSink, WarpStream}; use yrs_warp::AwarenessRef; const STATIC_FILES_DIR: &str = "examples/code-mirror/frontend/dist"; #[tokio::main] async fn main() { // We're using a single static document shared among all the peers. let awareness: AwarenessRef = { let doc = Doc::new(); { // pre-initialize code mirror document with some text let txt = doc.get_or_insert_text("codemirror"); let mut txn = doc.transact_mut(); txt.push( &mut txn, r#"function hello() { console.log('hello world'); }"#, ); } Arc::new(RwLock::new(Awareness::new(doc))) }; // open a broadcast group that listens to awareness and document updates // and has a pending message buffer of up to 32 updates let bcast = Arc::new(BroadcastGroup::new(awareness.clone(), 32).await); let static_files = warp::get().and(warp::fs::dir(STATIC_FILES_DIR)); let ws = warp::path("my-room") .and(warp::ws()) .and(warp::any().map(move || bcast.clone())) .and_then(ws_handler); let routes = ws.or(static_files); warp::serve(routes).run(([0, 0, 0, 0], 8000)).await; } async fn ws_handler(ws: Ws, bcast: Arc) -> Result { Ok(ws.on_upgrade(move |socket| peer(socket, bcast))) } async fn peer(ws: WebSocket, bcast: Arc) { let (sink, stream) = ws.split(); let sink = Arc::new(Mutex::new(WarpSink::from(sink))); let stream = WarpStream::from(stream); let sub = bcast.subscribe(sink, stream); match sub.completed().await { Ok(_) => println!("broadcasting for channel finished successfully"), Err(e) => eprintln!("broadcasting for channel finished abruptly: {}", e), } }