use bigdecimal::BigDecimal; use std::collections::HashMap; use std::ops::Add; use serial_test::serial; use time::{Duration, OffsetDateTime}; use common::*; use google_cloud_spanner::key::Key; use google_cloud_spanner::row::Row; use google_cloud_spanner::statement::Statement; use google_cloud_spanner::transaction_ro::ReadOnlyTransaction; mod common; #[ctor::ctor] fn init() { let filter = tracing_subscriber::filter::EnvFilter::from_default_env() .add_directive("google_cloud_spanner=trace".parse().unwrap()); let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init(); std::env::set_var("SPANNER_EMULATOR_HOST", "localhost:9010"); } async fn assert_read(tx: &mut ReadOnlyTransaction, user_id: &str, now: &OffsetDateTime, cts: &OffsetDateTime) { let reader = match tx.read("User", &user_columns(), Key::new(&user_id)).await { Ok(tx) => tx, Err(status) => panic!("read error {status:?}"), }; let mut rows = all_rows(reader).await.unwrap(); assert_eq!(1, rows.len(), "row must exists"); let row = rows.pop().unwrap(); assert_user_row(&row, user_id, now, cts); } async fn assert_query(tx: &mut ReadOnlyTransaction, user_id: &str, now: &OffsetDateTime, cts: &OffsetDateTime) { let mut stmt = Statement::new("SELECT * FROM User WHERE UserId = @UserID"); stmt.add_param("UserId", &user_id); let mut rows = execute_query(tx, stmt).await; assert_eq!(1, rows.len(), "row must exists"); let row = rows.pop().unwrap(); assert_user_row(&row, user_id, now, cts); } async fn execute_query(tx: &mut ReadOnlyTransaction, stmt: Statement) -> Vec { let reader = match tx.query(stmt).await { Ok(tx) => tx, Err(status) => panic!("query error {status:?}"), }; all_rows(reader).await.unwrap() } #[tokio::test] #[serial] async fn test_query_and_read() { //set up test data let now = OffsetDateTime::now_utc(); let data_client = create_data_client().await; let user_id_1 = "user_1"; let user_id_2 = "user_2"; let user_id_3 = "user_3"; let cr = data_client .apply(vec![ create_user_mutation(user_id_1, &now), create_user_mutation(user_id_2, &now), create_user_mutation(user_id_3, &now), ]) .await .unwrap(); //test let mut tx = data_client.read_only_transaction().await.unwrap(); let ts = cr.unwrap(); let ts = OffsetDateTime::from_unix_timestamp(ts.seconds) .unwrap() .replace_nanosecond(ts.nanos as u32) .unwrap(); assert_query(&mut tx, user_id_1, &now, &ts).await; assert_query(&mut tx, user_id_2, &now, &ts).await; assert_query(&mut tx, user_id_3, &now, &ts).await; assert_read(&mut tx, user_id_1, &now, &ts).await; assert_read(&mut tx, user_id_2, &now, &ts).await; assert_read(&mut tx, user_id_3, &now, &ts).await; } #[tokio::test] #[serial] async fn test_complex_query() { let now = OffsetDateTime::now_utc(); let data_client = create_data_client().await; let user_id_1 = "user_10"; let cr = data_client .apply(vec![ create_user_mutation(user_id_1, &now), create_user_item_mutation(user_id_1, 1), create_user_item_history_mutation(user_id_1, 1, &now), create_user_item_mutation(user_id_1, 2), create_user_item_history_mutation(user_id_1, 2, &now.add(Duration::seconds(-1))), create_user_item_history_mutation(user_id_1, 2, &now), create_user_character_mutation(user_id_1, 10), create_user_character_mutation(user_id_1, 20), ]) .await .unwrap(); let mut tx = data_client.read_only_transaction().await.unwrap(); let mut stmt = Statement::new( "SELECT *, ARRAY( SELECT AS STRUCT *, ARRAY(SELECT AS STRUCT * FROM UserItemHistory uih WHERE ui.UserId = uih.UserID AND ui.ItemId = uih.ItemId ORDER BY uih.UsedAt) as UserItemHistory FROM UserItem ui WHERE ui.UserId = p.UserId ORDER BY ui.ItemID ) as UserItem, ARRAY(SELECT AS STRUCT * FROM UserCharacter WHERE UserId = p.UserId ORDER BY CharacterID) as UserCharacter, FROM User p WHERE UserId = @UserId; ", ); stmt.add_param("UserId", &user_id_1); let mut rows = execute_query(&mut tx, stmt).await; assert_eq!(1, rows.len()); let row = rows.pop().unwrap(); // check UserTable let ts = cr.unwrap(); let ts = OffsetDateTime::from_unix_timestamp(ts.seconds) .unwrap() .replace_nanosecond(ts.nanos as u32) .unwrap(); assert_user_row(&row, user_id_1, &now, &ts); let mut user_items = row.column_by_name::>("UserItem").unwrap(); let first_item = user_items.pop().unwrap(); assert_eq!(first_item.user_id, user_id_1); assert_eq!(first_item.item_id, 2); assert_eq!(first_item.quantity, 100); assert_eq!(first_item.user_item_history.len(), 2); assert_ne!(OffsetDateTime::from(first_item.updated_at).to_string(), now.to_string()); assert_eq!(first_item.user_item_history[0].user_id, user_id_1); assert_eq!(first_item.user_item_history[0].item_id, first_item.item_id); assert_ne!( &(first_item.user_item_history[0].used_at).to_string(), &(first_item.user_item_history[1].used_at).to_string() ); assert_eq!(first_item.user_item_history[1].user_id, user_id_1); assert_eq!(first_item.user_item_history[1].item_id, first_item.item_id); assert_eq!(&(first_item.user_item_history[1].used_at).to_string(), &now.to_string()); let second_item = user_items.pop().unwrap(); assert_eq!(second_item.user_id, user_id_1); assert_eq!(second_item.item_id, 1); assert_eq!(second_item.quantity, 100); assert_eq!(second_item.user_item_history.len(), 1); assert_eq!(second_item.user_item_history[0].user_id, user_id_1); assert_eq!(second_item.user_item_history[0].item_id, second_item.item_id); assert_eq!(&(second_item.user_item_history[0].used_at).to_string(), &now.to_string()); assert_ne!(OffsetDateTime::from(second_item.updated_at).to_string(), now.to_string()); assert!(user_items.is_empty()); let mut user_characters = row.column_by_name::>("UserCharacter").unwrap(); let first_character = user_characters.pop().unwrap(); assert_eq!(first_character.user_id, user_id_1); assert_eq!(first_character.character_id, 20); assert_eq!(first_character.level, 1); assert_ne!(OffsetDateTime::from(first_character.updated_at).to_string(), now.to_string()); let second_character = user_characters.pop().unwrap(); assert_eq!(second_character.user_id, user_id_1); assert_eq!(second_character.character_id, 10); assert_eq!(second_character.level, 1); assert_ne!(OffsetDateTime::from(second_character.updated_at).to_string(), now.to_string()); assert!(user_characters.is_empty()); } #[tokio::test] #[serial] async fn test_batch_partition_query_and_read() { // set up test data let now = OffsetDateTime::now_utc(); let data_client = create_data_client().await; let user_id_1 = "user_1"; let user_id_2 = "user_2"; let user_id_3 = "user_3"; let cr = data_client .apply(vec![ create_user_mutation(user_id_1, &now), create_user_mutation(user_id_2, &now), create_user_mutation(user_id_3, &now), ]) .await .unwrap(); let many = (0..20000) .map(|x| create_user_mutation(&format!("user_partitionx_{x}"), &now)) .collect(); let cr2 = data_client.apply(many).await.unwrap(); // test let mut tx = data_client.batch_read_only_transaction().await.unwrap(); let ts = cr.unwrap(); let ts = OffsetDateTime::from_unix_timestamp(ts.seconds) .unwrap() .replace_nanosecond(ts.nanos as u32) .unwrap(); assert_partitioned_query(&mut tx, user_id_1, &now, &ts).await; assert_partitioned_query(&mut tx, user_id_2, &now, &ts).await; assert_partitioned_query(&mut tx, user_id_3, &now, &ts).await; assert_partitioned_read(&mut tx, user_id_1, &now, &ts).await; assert_partitioned_read(&mut tx, user_id_2, &now, &ts).await; assert_partitioned_read(&mut tx, user_id_3, &now, &ts).await; let stmt = Statement::new("SELECT * FROM User p WHERE p.UserId LIKE 'user_partitionx_%'"); let mut rows = execute_partitioned_query(&mut tx, stmt).await; assert_eq!(20000, rows.len()); let mut map = HashMap::::new(); while let Some(row) = rows.pop() { let user_id = row.column_by_name("UserId").unwrap(); map.insert(user_id, row); } let ts = cr2.unwrap(); let ts = OffsetDateTime::from_unix_timestamp(ts.seconds) .unwrap() .replace_nanosecond(ts.nanos as u32) .unwrap(); (0..20000).for_each(|x| { let user_id = format!("user_partitionx_{x}"); assert_user_row(map.get(&user_id).unwrap(), &user_id, &now, &ts) }); } async fn test_query(count: usize, prefix: &str) { let now = OffsetDateTime::now_utc(); let mutations = (0..count) .map(|x| create_user_mutation(&format!("user_{prefix}_{x}"), &now)) .collect(); let data_client = create_data_client().await; let cr = data_client.apply(mutations).await.unwrap(); let mut tx = data_client.read_only_transaction().await.unwrap(); let stmt = Statement::new(format!("SELECT *, Array[UserId,UserId,UserId,UserId,UserId] as UserIds, Array[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20] as NumArray FROM User p WHERE p.UserId LIKE 'user_{prefix}_%' ORDER BY UserId ")); let rows = execute_query(&mut tx, stmt).await; assert_eq!(count, rows.len()); let ts = cr.unwrap(); let ts = OffsetDateTime::from_unix_timestamp(ts.seconds) .unwrap() .replace_nanosecond(ts.nanos as u32) .unwrap(); let mut user_ids: Vec = (0..count).map(|x| format!("user_{prefix}_{x}")).collect(); user_ids.sort(); for (x, user_id) in user_ids.iter().enumerate() { let row = rows.get(x).unwrap(); assert_user_row(row, user_id, &now, &ts); let user_ids = row.column_by_name::>("UserIds").unwrap(); user_ids.iter().for_each(|u| assert_eq!(u, user_id)); let nums = row.column_by_name::>("NumArray").unwrap(); let mut start = 0_i64; assert_eq!(20, nums.len()); nums.iter().for_each(|u| { start += 1; assert_eq!(*u, start) }); } } #[tokio::test] #[serial] async fn test_few_records_value() { test_query(10, "few").await; } #[tokio::test] #[serial] async fn test_many_records_value() { test_query(40000, "many").await; } #[tokio::test] #[serial] async fn test_many_records_struct() { //set up test data let now = OffsetDateTime::now_utc(); let data_client = create_data_client().await; let user_id = "user_x_6"; let mutations = vec![create_user_mutation(user_id, &now)]; let _ = data_client.apply(mutations).await.unwrap(); let item_mutations = (0..4500).map(|x| create_user_item_mutation(user_id, x)).collect(); let _ = data_client.apply(item_mutations).await.unwrap(); let characters_mutations = (0..4500).map(|x| create_user_character_mutation(user_id, x)).collect(); let _ = data_client.apply(characters_mutations).await.unwrap(); //test let mut tx = data_client.read_only_transaction().await.unwrap(); let mut stmt = Statement::new( "SELECT *, ARRAY(SELECT AS STRUCT * FROM UserItem WHERE UserId = p.UserId) as UserItem, ARRAY(SELECT AS STRUCT * FROM UserCharacter WHERE UserId = p.UserId) as UserCharacter FROM User p WHERE UserId = @UserId;", ); stmt.add_param("UserId", &user_id); let mut rows = execute_query(&mut tx, stmt).await; assert_eq!(1, rows.len()); let row = rows.pop().unwrap(); let items = row.column_by_name::>("UserItem").unwrap(); assert_eq!(4500, items.len()); let characters = row.column_by_name::>("UserCharacter").unwrap(); assert_eq!(4500, characters.len()); } #[tokio::test] #[serial] async fn test_read_row() { //set up test data let now = OffsetDateTime::now_utc(); let user_id = "user_x_x"; let mutations = vec![create_user_mutation(user_id, &now)]; let data_client = create_data_client().await; let _ = data_client.apply(mutations).await.unwrap(); //test let mut tx = data_client.read_only_transaction().await.unwrap(); let row = tx.read_row("User", &["UserId"], Key::new(&user_id)).await.unwrap(); assert!(row.is_some()) } #[tokio::test] #[serial] async fn test_read_multi_row() { //set up test data let now = OffsetDateTime::now_utc(); let user_id = format!("user_x_{}", &now.second()); let user_id2 = format!("user_x_{}", &now.second() + 1); let mutations = vec![ create_user_mutation(&user_id, &now), create_user_mutation(&user_id2, &now), ]; let data_client = create_data_client().await; let _ = data_client.apply(mutations).await.unwrap(); // test let mut tx = data_client.read_only_transaction().await.unwrap(); let row = tx .read("User", &["UserId"], vec![Key::new(&user_id), Key::new(&user_id2)]) .await .unwrap(); assert_eq!(2, all_rows(row).await.unwrap().len()); } #[tokio::test] #[serial] async fn test_big_decimal() { let client = create_data_client().await; let mut tx = client.read_only_transaction().await.unwrap(); let stmt = Statement::new( "SELECT cast(\"-99999999999999999999999999999.999999999\" as numeric), cast(\"-99999999999999999999999999999\" as numeric), cast(\"-0.999999999\" as numeric), cast(\"0\" as numeric), cast(\"0.999999999\" as numeric), cast(\"99999999999999999999999999999\" as numeric), cast(\"99999999999999999999999999999.999999999\" as numeric)", ); let mut iter = tx.query(stmt).await.unwrap(); let row = iter.next().await.unwrap().unwrap(); assert_eq!( "-99999999999999999999999999999.999999999", row.column::(0).unwrap().to_string() ); assert_eq!( "-99999999999999999999999999999", row.column::(1).unwrap().to_string() ); assert_eq!("-0.999999999", row.column::(2).unwrap().to_string()); assert_eq!("0", row.column::(3).unwrap().to_string()); assert_eq!("0.999999999", row.column::(4).unwrap().to_string()); assert_eq!( "99999999999999999999999999999", row.column::(5).unwrap().to_string() ); assert_eq!( "99999999999999999999999999999.999999999", row.column::(6).unwrap().to_string() ); }