mod common; use chrono::Utc; use common::order::OrderCommand; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; use serde_json::Value as Json; use sqlx::FromRow; use test_case::test_case; use timesource::aggregate::UncommittedEvent; use timesource::store::{CommitOrder, EventStore, EventStoreBuilder}; use timesource::{Aggregate, TimesourceEvent}; use uuid::Uuid; use crate::common::data::{ bootstrap_test, sample_commands, sample_events_uncommitted, TestData, DSN, }; use crate::common::order::{OrderAggregate, OrderEvent}; #[allow(dead_code)] #[derive(Debug, FromRow)] struct EventRow { aggregate_type_id: i32, name: String, payload: Option, time: i64, } #[allow(dead_code)] #[derive(Debug, FromRow)] struct EventRowName { aggregate_type_id: i32, id: i32, name: String, } #[tokio::test] async fn different_aggregate_types_can_share_id() { let TestData { aggregate_type_name, .. } = bootstrap_test(false).await; let shared_id = Uuid::new_v4(); #[derive(Debug, Clone, Serialize, Deserialize, TimesourceEvent)] enum Store2Event { Something, } let event_store_builder = EventStoreBuilder::new(DSN); let store = event_store_builder .build::(&aggregate_type_name) .await .expect("store to be created"); let store2 = event_store_builder .build::("orders2") .await .expect("store to be created"); let events1 = sample_events_uncommitted(); let events2 = vec![Store2Event::Something; events1.len()] .into_iter() .map(|data| UncommittedEvent { utc: Utc::now(), data, }) .collect::>(); store .commit(shared_id, CommitOrder::None, &events1) .await .expect("Failed appending events"); store2 .commit(shared_id, CommitOrder::None, &events2) .await .expect("Failed appending events"); let read_events = store .aggregate_stream(shared_id) .await .try_collect::>() .await .expect("failed to collect first stream of events from consumer"); let read_events2 = store2 .aggregate_stream(shared_id) .await .try_collect::>() .await .expect("failed to collect second stream of events from consumer"); for (i, event) in read_events.iter().enumerate() { assert_eq!( event.aggregate_id(), read_events2[i].aggregate_id(), "Expected event source ids to be the same between stores" ); } } #[tokio::test] #[should_panic(expected = "list of events can't be empty")] async fn return_err_given_no_events() { let TestData { aggregate_type_name, .. } = bootstrap_test(false).await; let store = EventStoreBuilder::new(DSN) .build::(&aggregate_type_name) .await .expect("store to be created"); store .commit(Uuid::new_v4(), CommitOrder::None, &[]) .await .unwrap(); } #[tokio::test] #[should_panic(expected = "AlreadyCreated")] async fn should_propagate_aggregate_err_when_handling_command() { let TestData { repository, mut root, .. } = bootstrap_test(false).await; // these are invalid events root.handle(OrderCommand::Create).unwrap(); root.handle(OrderCommand::Create).unwrap(); repository.commit_unorderly(&mut root).await.unwrap(); } // same as before, but when committed orderly, it shouldn't allow it #[tokio::test] #[should_panic(expected = "invalid aggregate time provided")] async fn return_err_for_ordered_events_which_are_not_first() { let TestData { aggregate_id, repository, root, .. } = bootstrap_test(false).await; let mut system_a_root = root; let mut system_b_root = OrderAggregate::root_with_id(aggregate_id); // given system A handles first commands first for command in sample_commands() { system_a_root .handle(command) .expect("Failed handling event for systemA"); } // given system B handles first commands later on for command in sample_commands() { system_b_root .handle(command) .expect("Failed handling event for systemA"); } let system_a_events = system_a_root.uncommitted_events(); let system_b_events = system_b_root.uncommitted_events(); for (index, event) in system_a_events.iter().enumerate() { assert!( event.utc < system_b_events[index].utc, "Something is not right with this test. Events in System A should be created before B" ); } // but then system B commit first repository.commit_orderly(&mut system_b_root).await.unwrap(); // and system A events arrive later so they should not be saved repository.commit_orderly(&mut system_a_root).await.unwrap(); } #[tokio::test] #[should_panic(expected = "invalid aggregate time provided")] async fn return_err_given_unordered_events_and_existing_offset() { let TestData { aggregate_id, repository, root, .. } = bootstrap_test(false).await; let mut system_a_root = root; let mut system_b_root = OrderAggregate::root_with_id(aggregate_id); // given system A handles commands first for command in sample_commands() { system_a_root .handle(command) .expect("Failed handling event for systemA"); } // given system B handles commands later on for command in sample_commands() { system_b_root .handle(command) .expect("Failed handling event for systemA"); } let system_a_events = system_a_root.uncommitted_events(); let system_b_events = system_b_root.uncommitted_events(); for (index, event) in system_a_events.iter().enumerate() { assert!( event.utc < system_b_events[index].utc, "Something is not right with this test. Events in System A should be created before B" ); } // but then system B commit first repository.commit_orderly(&mut system_b_root).await.unwrap(); // and system A events arrive later so they should not be saved repository.commit_orderly(&mut system_a_root).await.unwrap(); } #[test_case(true; "orderly")] #[test_case(false; "unorderly")] #[tokio::test] async fn should_return_committed_ids(orderly: bool) { let TestData { repository, mut root, .. } = bootstrap_test(true).await; let commands = sample_commands(); let expected_total = commands.len(); for command in commands { root.handle(command).unwrap(); } let output = if orderly { repository.commit_orderly(&mut root).await.unwrap() } else { repository.commit_unorderly(&mut root).await.unwrap() }; assert_eq!(output.len(), expected_total); }