use crate::common::generate_events; use eventstore::{Client, ProjectionClient}; use serde::Deserialize; // This is the state of the projection, see tests/fixtures/projection.js. #[derive(Deserialize, Debug)] struct State { #[serde(rename = "foo")] _foo: Foo, } #[derive(Deserialize, Debug)] struct Foo { #[serde(rename = "baz")] _baz: Baz, } #[derive(Deserialize, Debug)] struct Baz { #[serde(rename = "count")] _count: f64, } static PROJECTION_FILE: &'static str = include_str!("../fixtures/projection.js"); static PROJECTION_UPDATED_FILE: &'static str = include_str!("../fixtures/projection-updated.js"); async fn wait_until_projection_status_cc( client: &ProjectionClient, name: &str, last_status: &mut String, status: &str, ) -> eyre::Result<()> { loop { let result = client.get_status(name, &Default::default()).await?; if let Some(stats) = result { if stats.status.contains(status) { break; } *last_status = stats.status.clone(); } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } Ok(()) } async fn wait_until_projection_status_is( client: &ProjectionClient, name: &str, status: &str, ) -> eyre::Result<()> { let mut last_status = "".to_string(); let result = tokio::time::timeout( std::time::Duration::from_secs(FIVE_MINS_IN_SECS), wait_until_projection_status_cc(client, name, &mut last_status, status), ) .await; if result.is_err() { error!( "Projection {} doesn't reach the expected status. Got {}, Expected {}", name, last_status, status ); } result? } async fn wait_until_state_ready(client: &ProjectionClient, name: &str) -> eyre::Result where A: serde::de::DeserializeOwned + Send, { tokio::time::timeout( std::time::Duration::from_secs(FIVE_MINS_IN_SECS), async move { loop { let result: serde_json::Result = client.get_state(name, &Default::default()).await?; if let Ok(a) = result { return Ok::(a); } } }, ) .await? } async fn wait_until_result_ready(client: &ProjectionClient, name: &str) -> eyre::Result where A: serde::de::DeserializeOwned + Send, { tokio::time::timeout( std::time::Duration::from_secs(FIVE_MINS_IN_SECS), async move { loop { let result: serde_json::Result = client.get_result(name, &Default::default()).await?; if let Ok(a) = result { return Ok::(a); } } }, ) .await? } const FIVE_MINS_IN_SECS: u64 = 5 * 60; async fn create_projection( client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await } // TODO - A projection must be stopped to be able to delete it. But Stop projection gRPC call doesn't exist yet. async fn delete_projection( client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; debug!("delete_projection: create_projection succeeded: {}", name); client.abort(name.as_str(), &Default::default()).await?; wait_until_projection_status_is(client, name.as_str(), "Aborted").await?; debug!("delete_projection: reading newly-created projection statistic succeeded"); let cloned_name = name.clone(); // There is a race-condition in the projection manager: https://github.com/EventStore/EventStore/issues/2938 let result = tokio::time::timeout(std::time::Duration::from_secs(10), async move { loop { let result = client .delete(cloned_name.as_str(), &Default::default()) .await; if result.is_ok() { break; } warn!("projection deletion failed with: {:?}. Retrying...", result); let _ = tokio::time::sleep(std::time::Duration::from_millis(500)).await; } }) .await; if result.is_err() { warn!("projection deletion didn't complete under test timeout. Not a big deal considering https://github.com/EventStore/EventStore/issues/2938"); } Ok(()) } async fn update_projection( client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; client .update( name.as_str(), PROJECTION_UPDATED_FILE.to_string(), &Default::default(), ) .await?; let stats = client .get_status(name.as_str(), &Default::default()) .await?; assert!(stats.is_some()); let stats = stats.unwrap(); assert_eq!(stats.name, name); assert_eq!(stats.version, 1); Ok(()) } async fn enable_projection( client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; client.enable(name.as_str(), &Default::default()).await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; Ok(()) } async fn disable_projection( client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; client.enable(name.as_str(), &Default::default()).await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; client.disable(name.as_str(), &Default::default()).await?; wait_until_projection_status_is(client, name.as_str(), "Stopped").await?; Ok(()) } async fn reset_projection( client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; client.enable(name.as_str(), &Default::default()).await?; client.reset(name.as_str(), &Default::default()).await?; Ok(()) } async fn projection_state( stream_client: &Client, client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let events = generate_events("testing", 10); let stream_name = gen_name.next().unwrap(); stream_client .append_to_stream(stream_name, &Default::default(), events) .await?; let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; client.enable(name.as_str(), &Default::default()).await?; let state = wait_until_state_ready::(client, name.as_str()).await?; debug!("{:?}", state); Ok(()) } async fn projection_result( stream_client: &Client, client: &ProjectionClient, gen_name: &mut names::Generator<'_>, ) -> eyre::Result<()> { let events = generate_events("testing", 10); let stream_name = gen_name.next().unwrap(); stream_client .append_to_stream(stream_name, &Default::default(), events) .await?; let name = gen_name.next().unwrap(); client .create( name.as_str(), PROJECTION_FILE.to_string(), &Default::default(), ) .await?; wait_until_projection_status_is(client, name.as_str(), "Running").await?; client.enable(name.as_str(), &Default::default()).await?; let result = wait_until_result_ready::(client, name.as_str()).await?; debug!("{:?}", result); Ok(()) } pub async fn tests(client: Client) -> eyre::Result<()> { let mut name_gen = names::Generator::default(); let stream_client = client.clone(); let client: ProjectionClient = client.into(); debug!("before create_projection..."); create_projection(&client, &mut name_gen).await?; debug!("passed"); debug!("before delete_projection..."); delete_projection(&client, &mut name_gen).await?; debug!("passed"); debug!("before update_projection..."); update_projection(&client, &mut name_gen).await?; debug!("passed"); debug!("before enable_projection..."); enable_projection(&client, &mut name_gen).await?; debug!("passed"); debug!("before disable_projection..."); disable_projection(&client, &mut name_gen).await?; debug!("passed"); debug!("before reset_projection..."); reset_projection(&client, &mut name_gen).await?; debug!("passed"); debug!("before projection_state..."); projection_state(&stream_client, &client, &mut name_gen).await?; debug!("passed"); debug!("before projection_result..."); projection_result(&stream_client, &client, &mut name_gen).await?; debug!("passed"); Ok(()) }