mod common; use chrono::Utc; use futures::StreamExt; use timesource::consumer::store::ConsumerStore; use timesource::consumer::{ConsumerBuilder, RootConsumerStore}; use timesource::repository::Repository; use timesource::store::{CommitOrder, EventStore, EventStoreBuilder}; use timesource::Aggregate; use tokio::task::yield_now; use uuid::Uuid; use crate::common::data::{ bootstrap_test, bootstrap_test_cbor, sample_cbor_commands, sample_cbor_events, sample_commands, sample_events, sample_events_uncommitted, TestData, DSN, }; use crate::common::order::{OrderAggregate, OrderCommand, OrderEvent}; use crate::common::user_cbor::{UserCommand, UserEvent}; #[macro_use] extern crate test_case; /// /// AGGREGATE /// #[tokio::test] async fn consumer_should_fetch_prior_json_events_on_resume() { let TestData { aggregate_id, repository, mut root, consumer, .. } = bootstrap_test(false).await; // // Append event before resuming consumer // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Check it is ready for consumption // let mut stream = consumer.resume().await.expect("should resume consumer"); let message = stream.next().await.unwrap().unwrap(); assert_eq!(message.aggregate_id(), aggregate_id); assert_eq!(message.into_data(), OrderEvent::Created); } #[tokio::test] async fn consumer_should_fetch_prior_cbor_events_on_resume() { let TestData { aggregate_id, repository, mut root, consumer, .. } = bootstrap_test_cbor(false).await; // // Append event before resuming consumer // root.handle(UserCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Check it is ready for consumption // let mut stream = consumer.resume().await.expect("should resume consumer"); let message = stream.next().await.unwrap().unwrap(); assert_eq!(message.aggregate_id(), aggregate_id); assert_eq!(message.into_data(), UserEvent::Created); } #[tokio::test] async fn consumer_should_stream_json_events_after_notification() { let TestData { aggregate_id, repository, mut root, consumer, .. } = bootstrap_test(false).await; // start stream before messages are published let mut stream = consumer.resume().await.expect("should resume consumer"); // // Append all kinds of events // Here we test JSON representation of different types // for command in sample_commands() { root.handle(command) .expect("Should be able to submit command"); } repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Check message for first aggregate is ready for consumption // for event in sample_events() { let message = stream.next().await.unwrap().unwrap(); assert_eq!(message.aggregate_id(), aggregate_id); assert_eq!(message.into_data(), event); } } #[tokio::test] async fn consumer_should_stream_events_for_different_roots_after_notification() { let TestData { aggregate_id, repository, mut root, consumer, .. } = bootstrap_test(false).await; // start stream before messages are published let mut stream = consumer.resume().await.expect("should resume consumer"); // // Append event // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Check message for first aggregate is ready for consumption // let message = stream.next().await.unwrap().unwrap(); assert_eq!(message.aggregate_id(), aggregate_id); assert_eq!(message.into_data(), OrderEvent::Created); // // Now publish events for a new root // let mut root2 = OrderAggregate::root(); root2 .handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root2) .await .expect("Should be able to commit root"); let message = stream.next().await.unwrap().unwrap(); assert_eq!(message.aggregate_id(), root2.id()); assert_eq!(message.into_data(), OrderEvent::Created); } #[tokio::test] async fn consumer_should_stream_cbor_events_after_notification() { let TestData { aggregate_id, repository, mut root, consumer, .. } = bootstrap_test_cbor(false).await; // start stream before messages are published let mut stream = consumer.resume().await.expect("should resume consumer"); // // Append all kinds of events // Here we test JSON representation of different types // for command in sample_cbor_commands() { root.handle(command) .expect("Should be able to submit command"); } repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Check message for first aggregate is ready for consumption // for event in sample_cbor_events() { let message = stream.next().await.unwrap().unwrap(); assert_eq!(message.aggregate_id(), aggregate_id); assert_eq!(message.into_data(), event); } } #[tokio::test] async fn it_should_recover_if_postgres_notifications_buffer_gets_full() { let TestData { aggregate_type_name, consumer_name, .. } = bootstrap_test(false).await; let store = EventStoreBuilder::new(DSN) .build::(&aggregate_type_name) .await .expect("store to be created"); let events1 = sample_events_uncommitted(); let events2 = sample_events_uncommitted(); let events3 = sample_events_uncommitted(); // // Setup consumer with very limited capacity for buffers // let consumer = ConsumerBuilder::new(DSN) .with_notification_buffer_capacity(1) .with_event_buffer_capacity(events1.len() - 1) .aggregate_build::(consumer_name.into(), &aggregate_type_name) .await .unwrap(); // start stream before messages are published let stream = consumer.resume().await.expect("should resume consumer"); // // Append events and fill buffers // let aggregate_id = Uuid::new_v4(); // this notification should be consumed as the first one ok store .commit(aggregate_id, CommitOrder::None, &events1) .await .expect("Failed appending events"); // this notification should make notification buffer congested store .commit(aggregate_id, CommitOrder::None, &events2) .await .expect("Failed appending events"); // this notification should await until buffers' congestion eases store .commit(aggregate_id, CommitOrder::None, &events3) .await .expect("Failed appending events"); // buffer backpressure should be applied yield_now().await; let received = stream.take(9).map(|x| x.unwrap()).collect::>().await; assert_eq!(received.len(), 9, "It should receive all events"); } /// /// AGGREGATE ROOT /// #[tokio::test] async fn root_store_should_return_queries() { let TestData { repository, mut root, aggregate_type_name, consumer_root_name, aggregate_type_id, pool, .. } = bootstrap_test(false).await; // // Append events for root 1 // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Append events for root 2 // let store = EventStoreBuilder::new(DSN) .build::(&aggregate_type_name) .await .expect("store to be created"); let repository = Repository::new(store); let mut root2 = OrderAggregate::root(); root2 .handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root2) .await .expect("Should be able to commit root"); let store = RootConsumerStore::::new( root2.id(), aggregate_type_id, consumer_root_name.into(), pool.clone(), ); let events_after_offset = store.events_after_offset().collect::>().await; assert_eq!( events_after_offset.len(), 1, "should only return events after offset 2" ); let event = events_after_offset[0].as_ref().unwrap(); let offset = event.id(); // // Append more events for root 1 // root.handle(OrderCommand::Cancel).unwrap(); repository.commit_orderly(&mut root).await.unwrap(); let events_after = store.events_after(offset).collect::>().await; assert_eq!( events_after.len(), 0, "only one event for root2 has been stored" ); // // Append more events for root 2 // root2.handle(OrderCommand::Cancel).unwrap(); repository.commit_orderly(&mut root2).await.unwrap(); let events_after = store.events_after(offset).collect::>().await; assert_eq!(events_after.len(), 1, "should return event after first one"); let events_range = store .events_range(offset, Utc::now().timestamp_nanos() as u64) .collect::>() .await; assert_eq!( events_range.len(), 1, "should return range after first one for root only" ); } #[tokio::test] async fn should_consume_prior_root_events_only() { let TestData { repository, mut root, aggregate_type_name, consumer_root_name, .. } = bootstrap_test(false).await; // // Append events for root 1 // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Append events for root 2 // let store = EventStoreBuilder::new(DSN) .build::(&aggregate_type_name) .await .expect("store to be created"); let repository = Repository::new(store); let mut root2 = OrderAggregate::root(); root2 .handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root2) .await .expect("Should be able to commit root"); // // Should only consume root2 events // let consumer = ConsumerBuilder::new(DSN) .aggregate_root_build::( consumer_root_name.clone().into(), &aggregate_type_name, root2.id(), ) .await .unwrap(); let mut stream = consumer.resume().await.expect("should resume consumer"); let message = stream.next().await.unwrap().unwrap(); assert_eq!(message.aggregate_id(), root2.id()); } #[tokio::test] async fn should_get_notified_for_root_events_only() { let TestData { repository, mut root, aggregate_type_name, consumer_root_name, .. } = bootstrap_test(false).await; let mut root2 = OrderAggregate::root(); root2 .handle(OrderCommand::Create) .expect("Should be able to submit command"); let consumer = ConsumerBuilder::new(DSN) .aggregate_root_build::( consumer_root_name.clone().into(), &aggregate_type_name, root2.id(), ) .await .unwrap(); let mut stream = consumer.resume().await.expect("should resume consumer"); // // Append events for root 1 // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); // // Append events for root 2 // repository .commit_orderly(&mut root2) .await .expect("Should be able to commit root"); // // Should only consume root2 events // let first_message = stream.next().await.unwrap().unwrap(); assert_eq!(first_message.aggregate_id(), root2.id()); } /// /// CHECKPOINTS /// #[test_case(false ; "aggregate")] #[test_case(true ; "aggregate_root")] #[tokio::test] async fn handles_checkpoints_ok(root_case: bool) { let TestData { repository, mut root, consumer_root, consumer, consumer_name, consumer_root_name, pool, aggregate_type_name, .. } = bootstrap_test(false).await; let mut stream = if root_case { consumer_root .resume() .await .expect("should resume consumer") } else { consumer.resume().await.expect("should resume consumer") }; let consumer_name = if root_case { consumer_root_name } else { consumer_name }; // // Append event // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); let message = stream.next().await.unwrap().unwrap(); if root_case { consumer_root.ack(message.id()).await.unwrap(); } else { consumer.ack(message.id()).await.unwrap(); } let saved_offset = sqlx::query_file_scalar!("tests/queries/consumer_offset.sql", &consumer_name) .fetch_one(&pool) .await .unwrap(); assert_eq!(message.id(), saved_offset.unwrap() as u64); // // Simulate another consumer // let consumer_2 = ConsumerBuilder::new(DSN) .aggregate_build::(consumer_name.clone().into(), &aggregate_type_name) .await .unwrap(); let mut stream_2 = consumer_2.resume().await.expect("should resume consumer"); // // Append another event // root.handle(OrderCommand::Cancel) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); let message = stream_2.next().await.unwrap().unwrap().into_data(); assert_eq!( message, OrderEvent::Cancelled, "should deliver next message after offset" ); } #[test_case(false ; "aggregate")] #[test_case(true ; "aggregate_root")] #[tokio::test] async fn it_should_save_last_checkpoint(root_case: bool) { let TestData { repository, mut root, consumer, consumer_root, consumer_name, consumer_root_name, pool, .. } = bootstrap_test(false).await; let stream = if root_case { consumer_root .resume() .await .expect("should resume consumer") } else { consumer.resume().await.expect("should resume consumer") }; let consumer_name = if root_case { consumer_root_name } else { consumer_name }; // // Append event // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); root.handle(OrderCommand::Cancel) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); let messages = stream.take(2).collect::>().await; let last_message = messages.last().unwrap().as_ref().unwrap(); if root_case { consumer_root.ack(last_message.id()).await.unwrap(); } else { consumer.ack(last_message.id()).await.unwrap(); } let saved_offset = sqlx::query_file_scalar!("tests/queries/consumer_offset.sql", &consumer_name) .fetch_one(&pool) .await .unwrap(); assert_eq!(last_message.id(), saved_offset.unwrap() as u64); } #[test_case(false ; "aggregate")] #[test_case(true ; "aggregate_root")] #[tokio::test] async fn it_should_handle_first_try_ack(root_case: bool) { let TestData { repository, mut root, consumer, consumer_root, consumer_name, consumer_root_name, pool, .. } = bootstrap_test(false).await; let mut stream = if root_case { consumer_root .resume() .await .expect("should resume consumer") } else { consumer.resume().await.expect("should resume consumer") }; let consumer_name = if root_case { consumer_root_name } else { consumer_name }; // // Append event // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); let message = stream.next().await.unwrap().unwrap(); if root_case { consumer_root.ack(message.id()).await.unwrap(); } else { consumer.ack(message.id()).await.unwrap(); } let saved_offset = sqlx::query_file_scalar!("tests/queries/consumer_offset.sql", &consumer_name) .fetch_one(&pool) .await .unwrap(); assert_eq!(message.id(), saved_offset.unwrap() as u64); } #[test_case(false ; "aggregate")] #[test_case(true ; "aggregate_root")] #[tokio::test] #[should_panic(expected = "checkpoint out of order")] async fn it_should_return_err_for_out_of_order_try_ack(root_case: bool) { let TestData { repository, mut root, consumer, consumer_root, .. } = bootstrap_test(false).await; let stream = if root_case { consumer_root .resume() .await .expect("should resume consumer") } else { consumer.resume().await.expect("should resume consumer") }; // // Append event // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); root.handle(OrderCommand::Cancel) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); let messages = stream.take(2).collect::>().await; let last_message = messages.last().unwrap().as_ref().unwrap(); consumer.try_ack(last_message.id()).await.unwrap(); } #[test_case(false ; "aggregate")] #[test_case(true ; "aggregate_root")] #[tokio::test] #[should_panic(expected = "checkpoint behind current offset")] async fn it_should_return_err_for_already_submitted_checkpoint(root_case: bool) { let TestData { repository, mut root, consumer, consumer_root, .. } = bootstrap_test(false).await; let mut stream = if root_case { consumer_root .resume() .await .expect("should resume consumer") } else { consumer.resume().await.expect("should resume consumer") }; // // Append event // root.handle(OrderCommand::Create) .expect("Should be able to submit command"); root.handle(OrderCommand::Cancel) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); let first = stream.next().await.unwrap().unwrap(); let last = stream.next().await.unwrap().unwrap(); consumer.try_ack(first.id()).await.unwrap(); consumer.try_ack(last.id()).await.unwrap(); consumer.try_ack(first.id()).await.unwrap(); } #[test_case(false ; "aggregate")] #[test_case(true ; "aggregate_root")] #[tokio::test] #[should_panic( expected = "update or delete on table aggregate_consumer violates foreign key constraint" )] async fn it_should_return_err_for_non_existent_offset_when_trying_checkpoint(root_case: bool) { let TestData { repository, mut root, consumer, consumer_root, .. } = bootstrap_test(false).await; let mut stream = if root_case { consumer_root .resume() .await .expect("should resume consumer") } else { consumer.resume().await.expect("should resume consumer") }; root.handle(OrderCommand::Create) .expect("Should be able to submit command"); repository .commit_orderly(&mut root) .await .expect("Should be able to commit root"); let event = stream.next().await.unwrap().unwrap(); consumer.try_ack(event.id() + 1).await.unwrap(); }