use std::sync::Arc; use eventually_core::store::{EventStore, Expected, Persisted}; use eventually_core::subscription::{EventSubscriber as EventSubscriberTrait, Subscription}; use eventually_postgres::{EventStoreBuilder, EventSubscriber, PersistentBuilder}; use futures::stream::{StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use testcontainers::core::Docker; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] enum Event { A, B, C, } #[tokio::test] async fn subscribe_all_works() { let docker = testcontainers::clients::Cli::default(); let postgres_image = testcontainers::images::postgres::Postgres::default(); let node = docker.run(postgres_image); let dsn = format!( "postgres://postgres:postgres@localhost:{}/postgres", node.get_host_port(5432).unwrap() ); let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls) .await .expect("could not connect to the docker container"); tokio::spawn(async move { connection .await .expect("connection with the database exited with error") }); let source_name = "subscriber_test"; let source_id = "subscriber_test"; let event_store_builder = EventStoreBuilder::migrate_database(&mut client) .await .expect("failed to run database migrations") .builder(Arc::new(client)); let mut event_store = event_store_builder .build::(source_name) .await .expect("failed to create event store"); let event_subscriber = EventSubscriber::::new(&dsn, source_name) .await .expect("failed to create event subscription"); let subscription = event_subscriber .subscribe_all() .await .expect("failed to create subscription from event subscriber"); event_store .append( source_id.to_owned(), Expected::Exact(0), vec![Event::A, Event::B, Event::C], ) .await .expect("failed while appending events"); let events: Vec> = subscription .take(3) .try_collect() .await .expect("failed to collect events from subscription"); assert_eq!( vec![ Persisted::from(source_id.to_owned(), Event::A) .version(1) .sequence_number(0), Persisted::from(source_id.to_owned(), Event::B) .version(2) .sequence_number(1), Persisted::from(source_id.to_owned(), Event::C) .version(3) .sequence_number(2) ], events ); } #[tokio::test] async fn persistent_subscription_works() { let docker = testcontainers::clients::Cli::default(); let postgres_image = testcontainers::images::postgres::Postgres::default(); let node = docker.run(postgres_image); let dsn = format!( "postgres://postgres:postgres@localhost:{}/postgres", node.get_host_port(5432).unwrap() ); let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls) .await .expect("could not connect to the docker container"); tokio::spawn(async move { connection .await .expect("connection with the database exited with error") }); let source_name = "persistent_subscription_test"; let source_id = "persistent_subscription_test"; let event_store_builder = EventStoreBuilder::migrate_database(&mut client) .await .expect("failed to run database migrations") .builder(Arc::new(client)); let mut event_store = event_store_builder .build::(source_name) .await .expect("failed to create event store"); let event_subscriber = EventSubscriber::::new(&dsn, source_name) .await .expect("failed to create event subscription"); let (client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls) .await .expect("could not connect to the docker container"); tokio::spawn(async move { connection .await .expect("connection with the database exited with error") }); let subscription = PersistentBuilder::new(Arc::new(client), event_store.clone(), event_subscriber) .get_or_create(source_name.to_owned()) .await .expect("failed to create persistent subscription"); event_store .append( source_id.to_owned(), Expected::Exact(0), vec![Event::A, Event::B, Event::C], ) .await .expect("failed while appending events"); let events: Vec> = subscription .resume() .await .expect("failed to resume subscription") .take(3) .try_collect() .await .expect("failed to collect events from subscription"); assert_eq!( vec![ Persisted::from(source_id.to_owned(), Event::A) .version(1) .sequence_number(0), Persisted::from(source_id.to_owned(), Event::B) .version(2) .sequence_number(1), Persisted::from(source_id.to_owned(), Event::C) .version(3) .sequence_number(2) ], events ); }