mod common; #[cfg(test)] mod tests { use std::sync::Arc; use derivative::Derivative; use postgres_types::Kind; use proptest::collection::vec; use proptest::prelude::*; use proptest::test_runner::{TestCaseResult, TestRunner}; use runtime::Builder; use testcontainers::{clients, Docker}; use testcontainers::images::postgres::Postgres; use tokio::runtime; use tokio::runtime::Runtime; use uuid::Uuid; use pooly::AppContext; use pooly::models::payloads::{QueryRequest, QueryResponse, QuerySuccessResponse, tx_bulk_query_response, TxBulkQueryParams, TxBulkQueryRequest, TxBulkQueryRequestBody, TxBulkQuerySuccessResponse, ValueWrapper}; use pooly::models::payloads::query_response::Payload; use pooly::models::payloads::value_wrapper::Value; use pooly::services::queries::QueryService; use pooly::services::updatable::UpdatableService; use crate::common; extern crate derivative; const MAX_VALUES_PER_ACTION: usize = 5; #[test] fn test_write_read_types() { pretty_env_logger::try_init().unwrap(); let namespace = Uuid::new_v4().to_string(); let app_context = common::build_and_initialize_services(&namespace); let docker = clients::Cli::default(); let container = docker .run(Postgres::default().with_env_vars(common::build_env_vars())); let pg_host = container.get_host_port(common::INTERNAL_PG_PORT).unwrap(); app_context.connection_config_service .create(common::build_config(pg_host)) .expect("Could not create config."); let query_service = app_context.query_service.clone(); let mut runner = TestRunner::default(); let runtime = Arc::new( Builder::new_current_thread() .enable_all() .build() .expect("Could not build runtime.") ); let result = runner.run( &values_test_action_strategy(query_service, runtime), |action| { action.test(); TestCaseResult::Ok(()) } ); common::cleanup(app_context, &namespace); assert_eq!(true, result.is_ok(), "Got result: {:?}", result); } fn values_test_action_strategy(query_service: Arc, runtime: Arc) -> impl Strategy { vec(value_strategy(), MAX_VALUES_PER_ACTION) .prop_map( move |values| TestValuesAction::new( query_service.clone(), values, runtime.clone())) } fn value_strategy() -> impl Strategy { prop_oneof![ any::().prop_map(Value::Bool), any::>().prop_map(Value::Bytes), any::().prop_map(Value::Double), any::().prop_map(Value::Float), any::().prop_map(Value::Int4), any::().prop_map(Value::Int8), any::().prop_map(Value::String), ] } #[derive(Derivative)] #[derivative(Debug)] struct TestValuesAction { #[derivative(Debug="ignore")] service: Arc, runtime: Arc, nullable_queries: TestValueQueries, non_nullable_queries: TestValueQueries, values: Vec, } impl TestValuesAction { fn new(service: Arc, values: Vec, runtime: Arc) -> Self { TestValuesAction { service, runtime, nullable_queries: TestValueQueries::new(&values, true), non_nullable_queries: TestValueQueries::new(&values, false), values } } fn test(&self) { self.do_test(&self.non_nullable_queries, self.values .iter() .map(|value| ValueWrapper { value: Some(value.clone()) } ) .collect()); self.do_test(&self.nullable_queries, self.values .iter() .enumerate() .map(|(idx, value)| { let value_maybe = { if idx % 2 == 0 { Some(value.clone()) } else { None } }; ValueWrapper { value: value_maybe} }) .collect()); } fn do_test(&self, queries: &TestValueQueries, params: Vec) { self.do_test_single_queries(queries, ¶ms); self.do_test_tx_bulk_queries(queries, ¶ms); } fn do_test_tx_bulk_queries(&self, queries: &TestValueQueries, params: &Vec) { self.execute_single_query(&queries.create_table, Vec::new()); let insert_responses = self.execute_tx_bulk_query(vec![ (queries.insert_query.clone(), vec![ params.clone(), params.clone(), params.clone() ]), (queries.insert_query.clone(), vec![ params.clone(), params.clone() ]) ]).responses; assert_eq!(2, insert_responses.len()); assert_eq!(3, insert_responses[0].row_groups.len()); assert_eq!(2, insert_responses[1].row_groups.len()); let mut insert_rows = insert_responses[0].row_groups[0].rows.clone(); insert_rows.extend(insert_responses[0].row_groups[1].rows.clone()); insert_rows.extend(insert_responses[0].row_groups[2].rows.clone()); insert_rows.extend(insert_responses[1].row_groups[0].rows.clone()); insert_rows.extend(insert_responses[1].row_groups[1].rows.clone()); for row in &insert_rows { assert_eq!(params, &row.values); } let select_response = self.execute_single_query(&queries.select_query, Vec::new()); let select_rows = select_response.rows; assert_eq!(insert_rows, select_rows); self.execute_single_query(&queries.drop_table, Vec::new()); } fn do_test_single_queries(&self, queries: &TestValueQueries, params: &Vec) { self.execute_single_query(&queries.create_table, Vec::new()); self.execute_single_query(&queries.insert_query, params.clone()); let success_response = self.execute_single_query(&queries.select_query, Vec::new()); assert_eq!(&success_response.column_names, &queries.column_names); assert_eq!(success_response.rows.len(), 1); for row in success_response.rows { assert_eq!(params, &row.values); } self.execute_single_query(&queries.drop_table, Vec::new()); } fn execute_tx_bulk_query(&self, queries: Vec<(String, Vec>)>) -> TxBulkQuerySuccessResponse { let payload = self.runtime.block_on( self.service.bulk_tx( common::CLIENT_ID, &TxBulkQueryRequest { connection_id: common::CONNECTION_ID.to_string(), queries: queries .clone() .into_iter() .map(|(query, params)| TxBulkQueryRequestBody { query: query.clone(), params: params .into_iter() .map(|values| TxBulkQueryParams { values }) .collect() }) .collect() }, "tx_correlation_id_1" ) ).0.payload; match payload { Some(tx_bulk_query_response::Payload::Success(response)) => response, _ => panic!("Expected success query response, failed to execute: {:?}, got: {:?}", queries, payload) } } fn execute_single_query(&self, query: &str, params: Vec) -> QuerySuccessResponse { let payload = self.runtime.block_on( self.service.query(common::CLIENT_ID, &QueryRequest { connection_id: common::CONNECTION_ID.to_string(), query: query.into(), params }, query)).0.payload; match payload { Some(Payload::Success(response)) => response, _ => panic!("Expected success query response, failed to execute: {}, got: {:?}", query, payload) } } } #[derive(Debug)] struct TestValueQueries { column_names: Vec, create_table: String, drop_table: String, select_query: String, insert_query: String } impl TestValueQueries { fn new(values: &Vec, nullable: bool) -> Self { let table_name = "table_".to_string() + Uuid::new_v4().to_string().replace("-", "_").as_str(); let (columns_declaration, col_names) = Self::build_columns_declaration(&values, nullable); let create_table = format!( "CREATE TABLE {table_name} ({columns_declaration});", table_name=table_name, columns_declaration=columns_declaration ); let drop_table = format!("DROP TABLE {table_name};", table_name=table_name); let col_names_declaration = col_names.join(","); let select_query = format!("SELECT {col_names_declaration} FROM {table_name};", col_names_declaration=col_names_declaration, table_name=table_name); let insert_query = format!( "INSERT INTO {table_name}({col_names_declaration}) \ VALUES ({values_declaration}) \ RETURNING *;", table_name=table_name, col_names_declaration=col_names_declaration, values_declaration= (1..col_names.len() + 1) .map(|i| format!("${}", i)) .collect::>() .join(",")); TestValueQueries { column_names: col_names, create_table, drop_table, select_query, insert_query } } fn build_columns_declaration(values: &Vec, nullable: bool) -> (String, Vec) { let mut declaration = String::new(); let mut column_names = Vec::new(); for (idx, value) in values.iter().enumerate() { if !declaration.is_empty() { declaration += ", "; } match value { Value::Bool(_) => Self::col(idx, &mut declaration, &mut column_names, "boolean", nullable), Value::Bytes(_) => Self::col(idx, &mut declaration, &mut column_names, "bytea", nullable), Value::Double(_) => Self::col(idx, &mut declaration, &mut column_names, "double precision", nullable), Value::Float(_) => Self::col(idx, &mut declaration, &mut column_names, "real", nullable), Value::Int4(_) => Self::col(idx, &mut declaration, &mut column_names, "integer", nullable), Value::Int8(_) => Self::col(idx, &mut declaration, &mut column_names, "bigint", nullable), Value::String(_) => Self::col(idx, &mut declaration, &mut column_names, "varchar", nullable), Value::Json(_) => Self::col(idx, &mut declaration, &mut column_names, "jsonb", nullable), } } (declaration, column_names) } fn col(idx: usize, declaration: &mut String, col_names: &mut Vec, col_type: &str, nullable: bool) { let null_declaration = { if nullable { "null" } else { "not null" } }; let col_name = format!("{}_{}_col", col_type.replace(" ", "_"), idx); declaration.push_str( format!("{} {} {}", &col_name, col_type, null_declaration).as_str()); col_names.push(col_name); } } }