#![allow(dead_code)] use chrono::Utc; use sqlx::postgres::PgPoolOptions; use sqlx::{Pool, Postgres}; use timesource::aggregate::UncommittedEvent; use timesource::consumer::{AggregateConsumerStore, Consumer, ConsumerBuilder, RootConsumerStore}; use timesource::repository::Repository; use timesource::store::{EventStore, EventStoreBuilder, TimescaleStore}; use timesource::{Aggregate, AggregateRoot}; use tracing_subscriber::prelude::*; use tracing_subscriber::EnvFilter; use uuid::Uuid; use crate::common::order::OrderItem; use crate::common::user_cbor::UserCommand; use super::order::{OrderAggregate, OrderCommand, OrderEvent}; use super::user_cbor::{UserAggregate, UserEvent}; pub const DSN: &str = "postgres://postgres@localhost/timesource"; use gabble::Gab; use rand::{thread_rng, Rng}; pub struct TestData where A: Aggregate + 'static, S: EventStore, { pub aggregate_id: Uuid, pub aggregate_type_id: i32, pub aggregate_type_name: String, pub pool: Pool, pub repository: Repository, pub root: AggregateRoot, pub consumer_name: String, pub consumer: Consumer>, pub consumer_root: Consumer>, pub consumer_root_name: String, } pub async fn bootstrap_test<'a>( with_tracing: bool, ) -> TestData> { if with_tracing { let filter_layer = EnvFilter::try_from_default_env() .or_else(|_| EnvFilter::try_new("timesource=debug")) .unwrap(); tracing_subscriber::registry() .with(filter_layer) .with(tracing_subscriber::fmt::layer()) .try_init() .unwrap_or_else(|_| {}); } let mut rng = thread_rng(); let aggregate_type_name: Gab = rng.gen(); let aggregate_type_name = aggregate_type_name.to_string(); let consumer_name: Gab = rng.gen(); let consumer_name = consumer_name.to_string(); let consumer_root_name: Gab = rng.gen(); let consumer_root_name = consumer_root_name.to_string(); let pool = PgPoolOptions::new() .connect(DSN) .await .expect("being able to create Pg pool"); let store = EventStoreBuilder::new(DSN) .build::(&aggregate_type_name) .await .expect("store to be created"); let consumer = ConsumerBuilder::new(DSN) .aggregate_build(consumer_name.clone().into(), &aggregate_type_name) .await .unwrap(); let root = OrderAggregate::root(); let consumer_root = ConsumerBuilder::new(DSN) .aggregate_root_build( consumer_root_name.clone().into(), &aggregate_type_name, root.id(), ) .await .unwrap(); let aggregate_type_id = store.aggregate_type_id; let repository = Repository::new(store); TestData { aggregate_id: root.id(), aggregate_type_id, aggregate_type_name, repository, root, pool, consumer_name, consumer, consumer_root, consumer_root_name, } } pub async fn bootstrap_test_cbor<'a>( with_tracing: bool, ) -> TestData> { if with_tracing { let filter_layer = EnvFilter::try_from_default_env() .or_else(|_| EnvFilter::try_new("timesource=debug")) .unwrap(); tracing_subscriber::registry() .with(filter_layer) .with(tracing_subscriber::fmt::layer()) .try_init() .unwrap_or_else(|_| {}); } let mut rng = thread_rng(); let aggregate_type_name: Gab = rng.gen(); let aggregate_type_name = aggregate_type_name.to_string(); let consumer_name: Gab = rng.gen(); let consumer_name = consumer_name.to_string(); let consumer_root_name: Gab = rng.gen(); let consumer_root_name = consumer_root_name.to_string(); let pool = PgPoolOptions::new() .connect(DSN) .await .expect("being able to create Pg pool"); let store = EventStoreBuilder::new(DSN) .build::(&aggregate_type_name) .await .expect("store to be created"); let consumer = ConsumerBuilder::new(DSN) .aggregate_build(consumer_name.clone().into(), &aggregate_type_name) .await .unwrap(); let root = UserAggregate::root(); let consumer_root = ConsumerBuilder::new(DSN) .aggregate_root_build( consumer_root_name.clone().into(), &aggregate_type_name, root.id(), ) .await .unwrap(); let aggregate_type_id = store.aggregate_type_id; let repository = Repository::new(store); TestData { aggregate_id: root.id(), aggregate_type_id, aggregate_type_name, repository, root, pool, consumer_name, consumer, consumer_root, consumer_root_name, } } #[allow(dead_code)] pub fn sample_commands() -> Vec { vec![ OrderCommand::Create, OrderCommand::AddItem { item: OrderItem { item_sku: "sku-123".into(), quantity: 1, price: 12, }, }, OrderCommand::Empty("for no reason".to_string()), ] } #[allow(dead_code)] pub fn sample_cbor_commands() -> Vec { vec![ UserCommand::Create, UserCommand::SetName { name: "John".to_string(), surname: None, }, ] } #[allow(dead_code)] pub fn sample_cbor_events() -> Vec { vec![ UserEvent::Created, UserEvent::NameChanged { name: "John".to_string(), surname: None, }, ] } pub fn sample_events() -> Vec { vec![ OrderEvent::Created, OrderEvent::ItemAdded { item: OrderItem { item_sku: "sku-123".into(), quantity: 1, price: 12, }, }, OrderEvent::Emptied("for no reason".into()), ] } #[allow(dead_code)] pub fn sample_events_uncommitted() -> Vec> { sample_events() .into_iter() .map(|data| UncommittedEvent { utc: Utc::now(), data, }) .collect::>() }