use std::sync::Arc; use eventually_core::store::{EventStore, Expected, Persisted, Select}; use eventually_postgres::EventStoreBuilder; use futures::stream::TryStreamExt; use serde::{Deserialize, Serialize}; use testcontainers::core::Docker; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] enum Event { A, B, C, } #[tokio::test] async fn stream_all_works() { let docker = testcontainers::clients::Cli::default(); let postgres_image = testcontainers::images::postgres::Postgres::default(); let node = docker.run(postgres_image); let dsn = format!( "postgres://postgres:postgres@localhost:{}/postgres", node.get_host_port(5432).unwrap() ); let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls) .await .expect("could not connect to the docker container"); tokio::spawn(async move { connection .await .expect("connection with the database exited with error") }); let source_name = "stream_all_test"; let source_id_1 = "stream_all_test_1"; let source_id_2 = "stream_all_test_2"; let event_store_builder = EventStoreBuilder::migrate_database(&mut client) .await .expect("failed to run database migrations") .builder(Arc::new(client)); let mut event_store = event_store_builder .build::(source_name) .await .expect("failed to create event store"); event_store .append( source_id_1.to_owned(), Expected::Exact(0), vec![Event::A, Event::B, Event::C], ) .await .expect("failed while appending events"); event_store .append( source_id_2.to_owned(), Expected::Exact(0), vec![Event::C, Event::A, Event::B], ) .await .expect("failed while appending events"); // Select::All returns all the events. let events: Vec> = event_store .stream_all(Select::All) .await .expect("failed to create first stream") .try_collect() .await .expect("failed to collect events from subscription"); assert_eq!( vec![ Persisted::from(source_id_1.to_owned(), Event::A) .version(1) .sequence_number(0), Persisted::from(source_id_1.to_owned(), Event::B) .version(2) .sequence_number(1), Persisted::from(source_id_1.to_owned(), Event::C) .version(3) .sequence_number(2), Persisted::from(source_id_2.to_owned(), Event::C) .version(1) .sequence_number(3), Persisted::from(source_id_2.to_owned(), Event::A) .version(2) .sequence_number(4), Persisted::from(source_id_2.to_owned(), Event::B) .version(3) .sequence_number(5) ], events ); // Select::From returns a slice of the events by their sequence number, // in this case it will return only events coming from the second source. let events: Vec> = event_store .stream_all(Select::From(3)) .await .expect("failed to create second stream") .try_collect() .await .expect("failed to collect events from subscription"); assert_eq!( vec![ Persisted::from(source_id_2.to_owned(), Event::C) .version(1) .sequence_number(3), Persisted::from(source_id_2.to_owned(), Event::A) .version(2) .sequence_number(4), Persisted::from(source_id_2.to_owned(), Event::B) .version(3) .sequence_number(5) ], events ); } #[tokio::test] async fn stream_works() { let docker = testcontainers::clients::Cli::default(); let postgres_image = testcontainers::images::postgres::Postgres::default(); let node = docker.run(postgres_image); let dsn = format!( "postgres://postgres:postgres@localhost:{}/postgres", node.get_host_port(5432).unwrap() ); let (mut client, connection) = tokio_postgres::connect(&dsn, tokio_postgres::NoTls) .await .expect("could not connect to the docker container"); tokio::spawn(async move { connection .await .expect("connection with the database exited with error") }); let source_name = "stream_test"; let source_id = "stream_test"; let event_store_builder = EventStoreBuilder::migrate_database(&mut client) .await .expect("failed to run database migrations") .builder(Arc::new(client)); let mut event_store = event_store_builder .build::(source_name) .await .expect("failed to create event store"); event_store .append( source_id.to_owned(), Expected::Exact(0), vec![Event::A, Event::B, Event::C], ) .await .expect("failed while appending events"); // Select::All returns all the events. let events: Vec> = event_store .stream(source_id.to_owned(), Select::All) .await .expect("failed to create first stream") .try_collect() .await .expect("failed to collect events from subscription"); assert_eq!( vec![ Persisted::from(source_id.to_owned(), Event::A) .version(1) .sequence_number(0), Persisted::from(source_id.to_owned(), Event::B) .version(2) .sequence_number(1), Persisted::from(source_id.to_owned(), Event::C) .version(3) .sequence_number(2) ], events ); // Select::From returns a slice of the events by their version. let events: Vec> = event_store .stream(source_id.to_owned(), Select::From(3)) .await .expect("failed to create second stream") .try_collect() .await .expect("failed to collect events from subscription"); assert_eq!( vec![Persisted::from(source_id.to_owned(), Event::C) .version(3) .sequence_number(2)], events ); }