use std::time::Duration; use futures::{future, FutureExt, StreamExt}; use mofo::Mofo; use endr::{Node, ObjectState, Remote, MemoryStorageBackend}; macro_rules! timeout { ($f:expr) => { tokio::time::timeout(Duration::from_millis(100), $f).map(|r| r.expect("Timed out")) }; } async fn connect_nodes<'a>(node1: &'a Node, node1_name: &str, node2: &'a Node, node2_name: &str) { let (node1_as_remote, node2_as_remote) = Remote::new_connected_test_pair(node1_name, node2_name); node1.add_remote(node2_as_remote).await; node2.add_remote(node1_as_remote).await; } #[tokio::test] async fn if_remote_is_sync_everything_it_will_receive_log_updates_and_get_local_notifications() { let background = Mofo::new(); let node1 = Node::new(background.clone(), MemoryStorageBackend::new()); let node2 = Node::new(background.clone(), MemoryStorageBackend::new()); background .run_until(async { let (id, write_access) = node1.create_log::<()>(None).await; connect_nodes(&node1, "node1", &node2, "node2").await; let node2_diffs = node2.diffs(id, "node2_listener".to_owned()); node1.append_to_log(id, &write_access, litl::Val::number(1.0)).await; let diff = timeout!(node2_diffs .filter(|diff| future::ready(!diff.expect_log().new_entries.is_empty())) .next()) .await .unwrap(); assert_eq!(diff.expect_log().after, 0); assert_eq!(diff.expect_log().new_entries, vec![litl::Val::number(1.0)]); match &*node2.load_object(id).await.borrow() { ObjectState::Log(log) => { assert_eq!(log.entries, vec![litl::Val::number(1.0)]); } _ => panic!("Expected a log"), } }) .await; } #[tokio::test] async fn if_remote_is_sync_everything_it_will_receive_log_initial_states_as_local_notifications() { let background = Mofo::new(); let node1 = Node::new(background.clone(), MemoryStorageBackend::new()); let node2 = Node::new(background.clone(), MemoryStorageBackend::new()); background .run_until(async { let (id, write_access) = node1.create_log::<()>(None).await; node1.append_to_log(id, &write_access, litl::Val::number(1.0)).await; connect_nodes(&node1, "node1", &node2, "node2").await; let node2_diffs = node2.diffs(id, "node2_listener".to_owned()); let diff = timeout!(node2_diffs .filter(|diff| future::ready(!diff.expect_log().new_entries.is_empty())) .next()) .await .unwrap(); assert_eq!(diff.expect_log().after, 0); assert_eq!(diff.expect_log().new_entries, vec![litl::Val::number(1.0)]); match &*node2.load_object(id).await.borrow() { ObjectState::Log(log) => { assert_eq!(log.entries, vec![litl::Val::number(1.0)]); } _ => panic!("Expected a log"), } }) .await; } #[tokio::test] async fn log_appends_propagate_to_other_clients_with_shared_sync_everything() { let background = Mofo::new(); let client1 = Node::new(background.clone(), MemoryStorageBackend::new()); let server = Node::new(background.clone(), MemoryStorageBackend::new()); let client2 = Node::new(background.clone(), MemoryStorageBackend::new()); background .run_until(async { let (id, write_access) = client1.create_log::<()>(None).await; connect_nodes(&client1, "client1", &server, "server").await; connect_nodes(&client2, "client2", &server, "server").await; let client2_diffs = client2.diffs(id, "client2_listener".to_owned()); client1.append_to_log(id, &write_access, litl::Val::number(1.0)).await; let diff = timeout!(client2_diffs .filter(|diff| future::ready(!diff.expect_log().new_entries.is_empty())) .next()) .await .unwrap(); assert_eq!(diff.expect_log().after, 0); assert_eq!(diff.expect_log().new_entries, vec![litl::Val::number(1.0)]); match &*client2.load_object(id).await.borrow() { ObjectState::Log(log) => { assert_eq!(log.entries, vec![litl::Val::number(1.0)]); } _ => panic!("Expected a log"), } }) .await } // TODO: unsure if this is testing correctly - are things really happening in the order they appear in the code? // the select at the end makes me think otherwise #[tokio::test] async fn log_appends_dont_propagate_to_clients_that_dont_care_about_them() { let background = Mofo::new(); let client1 = Node::new(background.clone(), MemoryStorageBackend::new()); let server = Node::new(background.clone(), MemoryStorageBackend::new()); let client2 = Node::new(background.clone(), MemoryStorageBackend::new()); background .run_until(async { let (id, write_access) = client1.create_log::<()>(None).await; connect_nodes(&client1, "client1", &server, "server").await; connect_nodes(&client2, "client2", &server, "server").await; let server_diffs = server.diffs(id, "server_listener".to_owned()); client1.append_to_log(id, &write_access, litl::Val::number(1.0)).await; let diff = timeout!(server_diffs .filter(|diff| future::ready(!diff.expect_log().new_entries.is_empty())) .next()) .await .unwrap(); assert_eq!(diff.expect_log().after, 0); assert_eq!(diff.expect_log().new_entries, vec![litl::Val::number(1.0)]); match &*server.get_object(&id).unwrap().borrow() { ObjectState::Log(log) => { assert_eq!(log.entries, vec![litl::Val::number(1.0)]); } _ => panic!("Expected a log"), } tokio::time::sleep(Duration::from_millis(100)).await; assert!(client2.get_object(&id).is_none()); }) .await; }