use std::{collections::HashSet, time::Duration}; use futures::{Future, FutureExt, StreamExt}; use mofo::Mofo; use serde_json::json; use endr::{Diff, Node, ObjectState, SetItem, MemoryStorageBackend}; fn timeout>(f: F) -> impl Future { tokio::time::timeout(Duration::from_millis(100), f).map(|r| r.expect("Timed out")) } #[tokio::test] async fn can_create_append_and_read_a_log() { let node = Node::new(Mofo::new(), MemoryStorageBackend::new()); let (id, write_access) = node.create_log::<()>(None).await; node.append_to_log(id, &write_access, litl::Val::number(1.0)).await; match &*timeout(node.load_object(id)).await.borrow() { ObjectState::Log(log) => { assert_eq!(log.entries, vec![litl::Val::number(1.0)]); } _ => panic!("Expected a log"), } } #[tokio::test] async fn can_create_insert_into_and_read_a_set() { let node = Node::new(Mofo::new(), MemoryStorageBackend::new()); let (id, write_access) = node.create_set::<()>(None).await; let data1 = json!([1, 2, 3]); let data2 = json!([4, 5, 6]); let first_item_id = node .insert_into_set(id, &write_access, SetItem::new(data1.clone(), None)) .await .unwrap(); let _ = node .insert_into_set( id, &write_access, SetItem::new(data2.clone(), Some(first_item_id)), ) .await .unwrap(); match &*timeout(node.load_object(id)).await.borrow() { ObjectState::Set(set) => { assert!(set.header.is_some()); assert_eq!( set.items .values_ordered().iter() .map(|item| item.data.clone()) .collect::>(), [data1, data2] .iter() .map(|d| litl::to_val(d).unwrap()) .collect() ); } _ => panic!("Expected a set"), } } #[tokio::test] async fn can_create_a_blob() { let node = Node::new(Mofo::new(), MemoryStorageBackend::new()); let data = json!([1, 2, 3]); let id = node.create_blob(data.clone()).await; match &*timeout(node.load_object(id)).await.borrow() { ObjectState::Blob(blob) => { assert_eq!(blob.data.as_ref().unwrap(), &litl::to_val(&data).unwrap()); } _ => panic!("Expected a blob"), } } #[tokio::test] async fn listeners_get_notified_on_local_log_appends() { let node = Node::new(Mofo::new(), MemoryStorageBackend::new()); let (id, write_access) = node.create_log::<()>(None).await; let mut diffs = node.diffs(id, "test".to_owned()); match timeout(diffs.next()).await.unwrap() { Diff::Log(diff) => { assert!(diff.header.is_some()); assert_eq!(diff.new_entries, Vec::::new()); } _ => panic!("Expected a log diff"), } node.append_to_log(id, &write_access, litl::Val::number(1.0)).await; match timeout(diffs.next()).await.unwrap() { Diff::Log(diff) => { assert_eq!(diff.new_entries, vec![litl::Val::number(1.0)]); } _ => panic!("Expected a log diff"), } } #[tokio::test] async fn listeners_get_notified_with_initial_state() { let node = Node::new(Mofo::new(), MemoryStorageBackend::new()); let (id, write_access) = node.create_log::<()>(None).await; node.append_to_log(id, &write_access, litl::Val::number(1.0)).await; node.append_to_log(id, &write_access, litl::Val::number(2.0)).await; let mut diffs = node.diffs(id, "test".to_owned()); match timeout(diffs.next()).await.unwrap() { Diff::Log(diff) => { assert!(diff.header.is_some()); assert_eq!(diff.new_entries, vec![litl::Val::number(1.0), litl::Val::number(2.0)]); } _ => panic!("Expected a log diff"), } node.append_to_log(id, &write_access, litl::Val::number(3.0)).await; match timeout(diffs.next()).await.unwrap() { Diff::Log(diff) => { assert_eq!(diff.new_entries, vec![litl::Val::number(3.0)]); } _ => panic!("Expected a log diff"), } }