use eventually_core::store::{EventStore, Expected, Select}; use eventually_core::subscription::EventSubscriber; use eventually_core::versioning::Versioned; use eventually_redis::{EventStore as RedisEventStore, EventStoreBuilder}; use futures::stream::TryStreamExt; use serde::{Deserialize, Serialize}; use testcontainers::core::Docker; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] enum Event { A, B, C, } #[tokio::test] async fn it_works() { let docker = testcontainers::clients::Cli::default(); let redis_image = testcontainers::images::redis::Redis::default(); let node = docker.run(redis_image); let dsn = format!("redis://127.0.0.1:{}/", node.get_host_port(6379).unwrap()); let client = redis::Client::open(dsn).expect("failed to connect to Redis"); let builder = EventStoreBuilder::new(client).stream_page_size(128); let builder_clone = builder.clone(); tokio::spawn(async move { builder_clone .build_subscriber::("test-stream") .subscribe_all() .await .unwrap() .inspect_err(|err| eprintln!("Failed: {}", err)) .try_for_each(|event| async { Ok(println!( "Seq.no: {}, Version: {}, Event: {:?}", event.sequence_number(), event.version(), event.take(), )) }) .await .unwrap(); }); let mut store: RedisEventStore = builder .build_store("test-stream") .await .expect("failed to create redis connection"); for chunk in 0..1000 { store .append( "test-source-1".to_owned(), Expected::Any, vec![Event::A, Event::B, Event::C], ) .await .unwrap(); store .append( "test-source-2".to_owned(), Expected::Exact(chunk * 2), vec![Event::B, Event::A], ) .await .unwrap(); } let now = std::time::SystemTime::now(); assert_eq!( 5000usize, store .stream_all(Select::All) .await .unwrap() .try_fold(0usize, |acc, _x| async move { Ok(acc + 1) }) .await .unwrap() ); println!("Stream $all took {:?}", now.elapsed().unwrap()); assert_eq!( 3000usize, store .stream("test-source-1".to_owned(), Select::All) .await .unwrap() .try_fold(0usize, |acc, _x| async move { Ok(acc + 1) }) .await .unwrap() ); assert_eq!( 2000usize, store .stream("test-source-2".to_owned(), Select::All) .await .unwrap() .try_fold(0usize, |acc, _x| async move { Ok(acc + 1) }) .await .unwrap() ); }