use aes_gcm::aead::Aead; use aes_gcm::{KeyInit, Nonce}; use base64::engine::general_purpose; use base64::Engine; use chrono::{TimeDelta, TimeZone, Utc}; use ciborium::from_reader; use log::error; use std::collections::HashMap; use std::fs::File; use std::io::{Cursor, Read, Write}; use std::str; use std::{env, fs}; use antimatter_api::apis::configuration::Configuration; use antimatter_api::apis::{authentication_api, contexts_api, general_api}; use antimatter_api::models::set_data_policy_binding::DefaultAttachment; use antimatter_api::models::{ self, FactExpression, FactExpressionArgumentsInner, NewFactTypeDefinition, NewFactTypeDefinitionArgumentsInner, }; use antimatter_api::models::{ AddReadContext, AddWriteContext, CreatePeerDomain, DataPolicyClause, DataPolicyRuleChanges, DataPolicyRuleEffect, DomainAuthenticate, NewDataPolicy, NewDataPolicyRule, NewDomain, ReadContextParameter, ReadContextRequiredHook, SetDataPolicyBinding, SetDataPolicyBindingReadContextsInner, TagExpression, WriteContextConfigInfo, WriteContextConfigInfoRequiredHooksInner, }; use antimatter::capsule::common::{CapsuleTag, CellReader, Column, RowReader, SpanTag, TagType}; use antimatter::session::RUNTIME; use antimatter::session::session::{ recover_capsule, DomainIdentityToken, EncapsulateConfig, Session, SessionConf, }; use antimatter::session::api_helper::domains; use p256::elliptic_curve::sec1::ToEncodedPoint; use p256::{FieldBytes, PublicKey, Scalar, SecretKey}; use rand::{Rng, RngCore}; use sha3::{Digest, Sha3_256}; use url::Url; const ANTIMATTER_TEST_ADDRESS: &str = "test@antimatter.io"; const API_TARGET_VERSION: &str = "v2"; struct ScopeCall { c: Option, } impl Drop for ScopeCall { fn drop(&mut self) { self.c.take().unwrap()() } } macro_rules! expr { ($e: expr) => { $e }; } // tt hack macro_rules! defer { ($($data: tt)*) => ( let _scope_call = ScopeCall { c: Some(|| -> () { expr!({ $($data)* }) }) }; ) } fn antimatter_api_url() -> String { env::var("ANTIMATTER_TEST_API_URL") .unwrap_or_else(|_| "https://api.dev.antimatter.io".to_string()) } // decrypting_function is an example function that is intended to mirror what the DR cards are expected to do. fn decrypting_function( secret_key: &str, ) -> Box) -> Result, String> + Send> { let secret_bytes = hex::decode(secret_key).expect("Invalid hex string"); let secret_array: [u8; 32] = match secret_bytes.try_into() { Ok(arr) => arr, Err(vec) => { panic!("Error: secret_bytes has wrong length: {}", vec.len()); } }; let secret_field_bytes = FieldBytes::from(secret_array); let sk = match SecretKey::from_bytes(&secret_field_bytes) { Ok(sk) => sk, Err(e) => { panic!("Error: failed to construct secret key: {}", e); } }; let secret_scalar = Scalar::from(&sk); Box::new(move |token_bytes| { if token_bytes.len() < 33 + 12 { return Err("malformed token: token too short to contain key material".to_string()); } let public_key = PublicKey::from_sec1_bytes(token_bytes[..33].as_ref()) .map_err(|e| format!("failed to construct public key: {}", e))?; let public_affine_point = public_key.as_affine(); let shared_point = *public_affine_point * secret_scalar; let combined_point = shared_point.to_encoded_point(false); let combined_bytes = combined_point.as_bytes(); let mut hasher = Sha3_256::new(); hasher.update(combined_bytes); let hash_result = hasher.finalize(); let result = hash_result.as_slice(); let cipher = aes_gcm::Aes256Gcm::new_from_slice(result) .map_err(|e| format!("decrypt to generate cipher: {}", e))?; let nonce = Nonce::default(); let decoded_bytes = cipher .decrypt(&nonce, &token_bytes[33..]) .map_err(|e| format!("decrypt failed: {}", e))?; Ok(decoded_bytes) }) } #[test] fn get_admin_url() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key).expect("failed to create session"); let company_name = "some_name".to_string(); let custom_lifetime = 900; let result = session .get_admin_url(&company_name, None, Some(custom_lifetime)) .expect("failed to get admin URL"); let url = Url::parse(&*result).unwrap(); let mut base = format!("{}://{}", url.scheme(), url.host_str().unwrap()); if let Some(port) = url.port() { base = format!("{}:{:?}", base, port); } assert_eq!(base, antimatter_api_url().replace("api", "app")); assert_eq!(url.path(), format!("/settings/{}/byok", domain_id)); let query_pairs = url.query_pairs(); let vendor = query_pairs .into_owned() .find(|(key, _)| key == "vendor") .map(|(_, value)| value) .unwrap_or_default(); assert_eq!(vendor, company_name); let token = query_pairs .into_owned() .find(|(key, _)| key == "token") .map(|(_, value)| value) .unwrap_or_default(); let base64_decoded_token = general_purpose::STANDARD.decode(&token).unwrap(); let domain_identity_token: DomainIdentityToken = from_reader(&mut Cursor::new(base64_decoded_token)).unwrap(); let ts_not_before = Utc .timestamp_opt(domain_identity_token.not_valid_before, 0) .unwrap(); let ts_not_after = Utc .timestamp_opt(domain_identity_token.not_valid_after, 0) .unwrap(); // We hack in 10 seconds to account tor drift, etc. THis is done by the server so we need to // mimic this here let adjusted_lifetime = (custom_lifetime + 10) as i64; // There can be a little variance here, so als long as it is in a range we pass: let ts_duration = ts_not_after - ts_not_before; let ts_lower_bound = ts_duration - TimeDelta::seconds(10); let ts_upper_bound = ts_duration + TimeDelta::seconds(10); if ts_lower_bound >= TimeDelta::seconds(adjusted_lifetime) && ts_upper_bound <= TimeDelta::seconds(adjusted_lifetime) { panic!( "time stamp is too different from expected. Wanted {}, got {}", TimeDelta::seconds(adjusted_lifetime), ts_duration ) } } #[test] fn get_admin_url_for_peer() { env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let res = domains::create_domain("test@antimatter.io").expect("failed to create session"); let mut session = res.0; session .new_peer_domain_link_all( Some(vec!["child".to_string()]), None, "child".to_string(), None, "childDisplay".to_string(), ) .expect("failed to create peer"); let company_name = "example".to_string(); let nickname = "child".to_string(); let custom_lifetime = 900; let result = session .get_admin_url(&company_name, Some(&nickname), Some(custom_lifetime)) .expect("failed to get admin URL"); let url = Url::parse(&*result).unwrap(); let mut base = format!("{}://{}", url.scheme(), url.host_str().unwrap()); if let Some(port) = url.port() { base = format!("{}:{:?}", base, port); } let domain_id = session .get_peer(None, Some("child")) .expect("failed to get peer domain ID") .id; assert_eq!(base, antimatter_api_url().replace("api", "app")); assert_eq!(url.path(), format!("/settings/{}/byok", domain_id)); let query_pairs = url.query_pairs(); let vendor = query_pairs .into_owned() .find(|(key, _)| key == "vendor") .map(|(_, value)| value) .unwrap_or_default(); assert_eq!(vendor, company_name); let token = query_pairs .into_owned() .find(|(key, _)| key == "token") .map(|(_, value)| value) .unwrap_or_default(); let base64_decoded_token = general_purpose::STANDARD.decode(&token).unwrap(); let domain_identity_token: DomainIdentityToken = from_reader(&mut Cursor::new(base64_decoded_token)).unwrap(); let ts_not_before = Utc .timestamp_opt(domain_identity_token.not_valid_before, 0) .unwrap(); let ts_not_after = Utc .timestamp_opt(domain_identity_token.not_valid_after, 0) .unwrap(); // We hack in 10 seconds to account tor drift, etc. THis is done by the server so we need to // mimic this here let adjusted_lifetime = (custom_lifetime + 10) as i64; // There can be a little variance here, so als long as it is in a range we pass: let ts_duration = ts_not_after - ts_not_before; let ts_lower_bound = ts_duration - TimeDelta::seconds(10); let ts_upper_bound = ts_duration + TimeDelta::seconds(10); if ts_lower_bound >= TimeDelta::seconds(adjusted_lifetime) && ts_upper_bound <= TimeDelta::seconds(adjusted_lifetime) { panic!( "time stamp is too different from expected. Wanted {}, got {}", TimeDelta::seconds(adjusted_lifetime), ts_duration ) } } #[test] fn test_serialize_session() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut temp_session = Session::new(domain_id, api_key).expect("failed to create session"); // now serialize and deserialize let serialized = temp_session.to_serialized().unwrap(); let (mut session, _) = Session::from_serialized(serialized).unwrap(); // use the resulting session to encrypt and decrypt a capsule let input_data = vec![vec!["row0, col0: example data element".as_bytes().to_vec()]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![]]]) .expect("failed to generate data"); let columns = vec![Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }]; let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let (mut reader, _meta) = session .encapsulate(columns, cell_readers, vec![], cfg) .expect("failed to encapsulate"); let mut capsule_data: Vec = Vec::new(); reader .read_to_end(&mut capsule_data) .expect("failed to read capsule data"); // reader seems to be linked to the session so if we don't drop it, we get // Err: cannot borrow `session` as mutable more than once at a time drop(reader); // open the sealed capsule let writer = Cursor::new(capsule_data); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(input_data, output_data) } #[test] fn test_recover_old_capsule() { // A small test to ensure that we can recover capsule that have a more verbose DR token. let expected_data = vec![vec!["hello".as_bytes().to_vec()]]; let key = "4fc8f6d72999e72198ef5ec69fffbeadee60e4f4635578b48a8fc1c6d5e059c6"; // Validate that the capsule created using the old encoding of DR header works let capsule_old = general_purpose::STANDARD .decode( "gogY+RjYGIQYUxiQGMkCGGgDg0mYQ7ce5/d+40AbAAABkTEOWNP0hVhQgtnkoYMHAqBYRoxnRyHUPeNyjNRs\ lAsFuRBMH4lbLUx4M5aTcAkY36A2AlPpE/GJOK5tEUo/NynkSg8aL9wrS1mhqMyYewYEN5gfhBtgo9UASZhDtx7n937\ jQFEVnS2mIiepXlaCE1xJbZuJMJhbAhgjGDAY2AwYLxi6GEoYGhiYGM4YURjVGOIYIBhwGBkYXhg1GG8YnBhMGIMY9g\ oYgRgkGIQYqxigGD0YwRjZGHQY9Rg8GIcYjRiYGPYYsRiNGJkY/hcYGBiuGDgIGJkYfBhzGPUYfRgyGBgYLhinGNcYT\ Bh3GLcY3RjzGEsYdxiTGHUY3BiQGH4YfRhgFBjTGOsYlhhIGJAYXRgtGD0YfBg6GO8YVBjBGCQFGIgY7RcAAAAAAAAA\ AAAAAAAAAACFOkm+o2hBBPdQzks2DgXEPHv02970PiIAAAAAAAAAAAAAAAAAAAEkOZHhISWIuGdHAo4mWDW+g+oV1L8\ iAGGxBRrQ+v0L+hECEgAAAAAAAAAAAAAAAAAAAnIxAPDBZdoXGrdc6iLa7D321QAAAAD/AA==", ) .unwrap(); let writer = Cursor::new(capsule_old); let decrypting_fn = decrypting_function(key); let mut iterator = recover_capsule(writer, decrypting_fn).expect("failed to open capsule with DR key"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(expected_data, output_data); // Validate that the capsule created using the new encoding of DR header works let capsule_new = general_purpose::STANDARD .decode( "gogY+RjYGIQYUxiQGMkCGGgDg0lGIl4DQSF2YgAbAAABkTEaxXr0hVhQgtnkoYMHAqBYRvSdI9EdUuCdApRa\ KKIS9OXeRThrSNfaQQ0I00ZOACwWV0vyzI0y9GiQGphqQYBYvQd9r+EdMd1ACHS7UtjFQ56rSw7quRoASUYiXgNBIXZ\ iAFEbCwepOdghujbdGioW8saycFhbAh0k8rn68euFMnt+SzuNhEmV061of+3jZWPEDBJtzmTrTY1GfZx4Mgn3wyTG+i\ dez/NYosvEvuvwrWpA10h27lYDdHvfNLejhrpSk6WQ6qEmYdR1K3wy2AIHKBcAAAAAAAAAAAAAAAAAAADCog4DJo+zg\ eLaI/IUIohM1NjZHD9WQSIAAAAAAAAAAAAAAAAAAAFDh44TH3yaV5U4qXlB93qxSNSAmnEsDG4VGwbheFslBsH1EgAA\ AAAAAAAAAAAAAAAAAmaLZHkXDN6rf5cI5Y0VT+BKeQAAAAD/AA==", ) .unwrap(); let writer = Cursor::new(capsule_new); let decrypting_fn = decrypting_function(key); let mut iterator = recover_capsule(writer, decrypting_fn).expect("failed to open capsule with DR key"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(expected_data, output_data); } #[test] fn test_create_capsule() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id, api_key).expect("failed to create session"); let input_data = vec![vec!["row0, col0: example data element".as_bytes().to_vec()]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![]]]) .expect("failed to generate data"); let columns = vec![Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }]; let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let (mut reader, _meta) = session .encapsulate(columns, cell_readers, vec![], cfg) .expect("failed to encapsulate"); let mut capsule_data: Vec = Vec::new(); reader .read_to_end(&mut capsule_data) .expect("failed to read capsule data"); // reader seems to be linked to the session so if we don't drop it, we get // Err: cannot borrow `session` as mutable more than once at a time drop(reader); // Check the error returned if we failed to open a capsule let writer = Cursor::new(capsule_data.clone()); match session.open("unknown", HashMap::new(), HashMap::new(), writer) { Ok(_) => panic!("expected an error, got a result"), Err(e) => assert_eq!( e.to_string().starts_with( "Error: failed to open capsule: APIError: open request failed (404 Not Found):" ), true ), } // open the sealed capsule let writer = Cursor::new(capsule_data); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(input_data, output_data) } #[test] fn test_create_large_capsule() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id, api_key).expect("failed to create session"); // Make a large data source such that: // - data.len % classifier_chunk_size < classifier_overlap // - data.len % classifier_chunk_size > 0 let data = ("AB".repeat(16 * 1024 * 2) + "additional data") .as_bytes() .to_vec(); let input_data = vec![vec![data]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![]]]) .expect("failed to generate data"); let columns = vec![Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }]; let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let (capsule_data, _meta) = session .encapsulate_to_bytes(columns, cell_readers, vec![], cfg) .expect("failed to encapsulate"); // open the sealed capsule let writer = Cursor::new(capsule_data); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(input_data, output_data) } #[test] fn test_create_capsule_and_update() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key).expect("failed to create session"); let input_data = vec![vec!["row0, col0: example data element".as_bytes().to_vec()]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![]]]) .expect("failed to generate data"); let columns = vec![Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }]; let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let mut file = File::create("/tmp/example.cap").expect("failed to create a file"); defer!({ fs::remove_file("/tmp/example.cap").expect("failed to remove file"); }); let mut cap = session .new_capsule(columns, vec![], cfg, &mut file) .expect("failed to create streaming capsule"); cap.add_rows(cell_readers).expect("failed to add data"); cap.finalize().expect("failed to finalize"); drop(cap); file.flush().expect("failed to flush file"); // open the sealed capsule let file = File::open("/tmp/example.cap").expect("failed to open file"); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), file) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(input_data, output_data) } #[test] fn test_create_capsule_with_dr() { let (domain_id, api_key, config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key.clone()).expect("failed to create session"); // secret_key is hex encoded, public key is base64 encoded // TODO: we props want to improve how keys are added. right now we rely on them being encoded correctly let secret_key = "4fc8f6d72999e72198ef5ec69fffbeadee60e4f4635578b48a8fc1c6d5e059c6"; let public_key = "2eShgwgBWCEDPXAv9x4R081qWJlnEBeQw5ejPH5kuPYosKDQhYO/EVE="; enable_disaster_recovery(&config, &domain_id.clone(), public_key); let dr_settings = RUNTIME .block_on(general_api::domain_get_disaster_recovery_settings( &config, &domain_id.clone(), )) .expect("failed to enable disaster recovery"); assert_eq!(dr_settings.clone().enable.is_some(), true); assert_eq!(dr_settings.clone().enable.unwrap(), true); let input_data = vec![vec!["row0, col0: example data element".as_bytes().to_vec()]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![]]]) .expect("failed to generate data"); let columns = vec![Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }]; let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let (mut reader, _meta) = session .encapsulate(columns, cell_readers, vec![], cfg) .expect("failed to encapsulate"); let mut capsule_data: Vec = Vec::new(); reader .read_to_end(&mut capsule_data) .expect("failed to read capsule data"); drop(reader); // open the sealed capsule let writer = Cursor::new(capsule_data.clone()); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(input_data, output_data); // open the sealed capsule using the DR key let writer = Cursor::new(capsule_data); let decrypting_fn = decrypting_function(secret_key); let mut iterator = recover_capsule(writer, decrypting_fn).expect("failed to open capsule with DR key"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(input_data, output_data) } #[test] fn test_create_capsule_with_redaction() { let (domain_id, api_key, config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key).expect("failed to create session"); add_write_ctx( &config, &domain_id.clone().to_string(), &"test_ctx".to_string(), ); add_read_ctx( &config, &domain_id.clone().to_string(), &"test_ctx".to_string(), ); add_redaction_rule(&mut session, &"test_ctx".to_string()); let input_data = vec![vec![ "User: John Smith".as_bytes().to_vec(), "Access: Basic".as_bytes().to_vec(), ]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![], vec![]]]) .expect("failed to generate data"); let columns = vec![ Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }, Column { name: "col1".to_string(), tags: vec![], skip_classification: false, }, ]; let cfg = EncapsulateConfig { write_context_name: "test_ctx".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let (mut reader, _meta) = session .encapsulate(columns, cell_readers, vec![], cfg) .expect("failed to encapsulate"); let mut capsule_data: Vec = Vec::new(); reader .read_to_end(&mut capsule_data) .expect("failed to read capsule data"); // reader seems to be linked to the session so if we don't drop it, we get // Err: cannot borrow `session` as mutable more than once at a time drop(reader); // open the sealed capsule let writer = Cursor::new(capsule_data); let mut iterator = session .open("test_ctx", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); let expected_output = vec![vec![ "User: John {redacted}".as_bytes().to_vec(), "Access: Basic".as_bytes().to_vec(), ]]; assert_eq!(output_data, expected_output) } #[test] fn test_create_large_capsule_with_redaction() { let (domain_id, api_key, config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key.clone()).expect("failed to create session"); add_write_ctx( &config, &domain_id.clone().to_string(), &"test_ctx".to_string(), ); add_read_ctx( &config, &domain_id.clone().to_string(), &"test_ctx".to_string(), ); add_redaction_rule(&mut session, &"test_ctx".to_string()); // Make a large data source such that: // - data.len % classifier_chunk_size < classifier_overlap // - data.len % classifier_chunk_size > 0 let base_data = "..".repeat(16 * 1024 * 2); let data = (base_data.clone() + " John Smith").as_bytes().to_vec(); let input_data = vec![vec![data, "Access: Basic".as_bytes().to_vec()]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![], vec![]]]) .expect("failed to generate data"); let columns = vec![ Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }, Column { name: "col1".to_string(), tags: vec![], skip_classification: false, }, ]; let cfg = EncapsulateConfig { write_context_name: "test_ctx".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let (mut reader, _meta) = session .encapsulate(columns, cell_readers, vec![], cfg) .expect("failed to encapsulate"); let mut capsule_data: Vec = Vec::new(); reader .read_to_end(&mut capsule_data) .expect("failed to read capsule data"); // reader seems to be linked to the session so if we don't drop it, we get // Err: cannot borrow `session` as mutable more than once at a time drop(reader); // open the sealed capsule let writer = Cursor::new(capsule_data); let mut iterator = session .open("test_ctx", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); let expected_output = vec![vec![ (base_data + " {redacted}").as_bytes().to_vec(), "Access: Basic".as_bytes().to_vec(), ]]; assert_eq!(output_data, expected_output) } #[test] fn test_create_capsule_with_deny_record() { let (domain_id, api_key, config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key).expect("failed to create session"); add_write_ctx( &config, &domain_id.clone().to_string(), &"test_ctx".to_string(), ); add_read_ctx( &config, &domain_id.clone().to_string(), &"test_ctx".to_string(), ); let policy_id = session .create_data_policy(NewDataPolicy { name: "testpolicy1".to_string(), description: "test policy".to_string(), }) .expect("failed to create data policy") .policy_id; let ft_name = "test_ft"; let _ = session.add_fact_type( ft_name, NewFactTypeDefinition { arguments: vec![NewFactTypeDefinitionArgumentsInner { name: "example".to_string(), description: "example".to_string(), }], description: "test fact type".to_string(), }, ); session .update_data_policy_rules( policy_id.as_str(), DataPolicyRuleChanges { delete_rules: None, new_rules: Some(vec![ NewDataPolicyRule { comment: None, clauses: vec![DataPolicyClause { operator: antimatter_api::models::data_policy_clause::Operator::AnyOf, capabilities: None, facts: Some(vec![FactExpression { r#type: ft_name.to_string(), operator: antimatter_api::models::fact_expression::Operator::NotExists, arguments: vec![FactExpressionArgumentsInner{ operator: antimatter_api::models::fact_expression_arguments_inner::Operator::Any, values: None }], variables: None, }]), read_parameters: None, tags: None, }], effect: DataPolicyRuleEffect::DenyRecord, token_scope: None, token_format: None, priority: Some(0), assign_priority: None, }, ]), }, ) .expect("failed to add data policy rule"); session.set_data_policy_binding( policy_id.as_str(), SetDataPolicyBinding{ read_contexts: Some(vec![SetDataPolicyBindingReadContextsInner{ name: "test_ctx".to_string(), configuration: antimatter_api::models::set_data_policy_binding_read_contexts_inner::Configuration::Attached, }]), default_attachment: antimatter_api::models::set_data_policy_binding::DefaultAttachment::Attached, }, ).expect("failed to set data policy binding"); let input_data = vec![vec![ "data".as_bytes().to_vec(), "The name is Adam Smith".as_bytes().to_vec(), ]]; let cell_readers = convert_to_readers(input_data.clone(), vec![vec![vec![], vec![]]]) .expect("failed to generate data"); let columns = vec![ Column { name: "some".to_string(), tags: vec![], skip_classification: false, }, Column { name: "name".to_string(), tags: vec![], skip_classification: false, }, ]; let cfg = EncapsulateConfig { write_context_name: "test_ctx".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; // encapsulate the data let (mut reader, _meta) = session .encapsulate(columns, cell_readers, vec![], cfg) .expect("failed to encapsulate"); let mut capsule_data: Vec = Vec::new(); reader .read_to_end(&mut capsule_data) .expect("failed to read capsule data"); // reader seems to be linked to the session so if we don't drop it, we get // Err: cannot borrow `session` as mutable more than once at a time drop(reader); // open the sealed capsule let writer = Cursor::new(capsule_data); let mut iterator = session .open("test_ctx", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); println!("{:?}", output_data); let expected_output: Vec>> = vec![]; assert_eq!(output_data, expected_output) } #[test] fn test_create_capsule_generate_subdomains() { let (domain_id, api_key, config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key.clone()).expect("failed to create session"); let (id_a, key_a) = create_subdomain(domain_id.clone(), api_key.clone(), "tenant_a") .expect("failed to create subdomain"); let (id_b, _) = create_subdomain(domain_id.clone(), api_key.clone(), "tenant_b") .expect("failed to create subdomain"); let (_id_c, _) = create_subdomain(domain_id.clone(), api_key.clone(), "tenant_c") .expect("failed to create subdomain"); let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: Some("tenant".to_string()), create_subdomains: None, async_seal: false, }; let columns = vec!["id", "tenant", "data1", "data2"] .into_iter() .map(|item| Column { name: item.to_string(), tags: vec![], skip_classification: false, }) .collect(); let tags = vec![ vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], vec![vec![], vec![], vec![], vec![]], ]; let expected_data: Vec>> = vec![ vec!["0", "tenant_a", "foo", "bar"], vec!["1", "tenant_a", "foo", "bar"], vec!["2", "tenant_b", "foo", "bar"], vec!["3", "tenant_a", "foo", "bar"], vec!["4", "tenant_b", "foo", "bar"], vec!["5", "tenant_b", "foo", "bar"], vec!["6", "tenant_a", "foo", "bar"], vec!["7", "tenant_b", "foo", "bar"], vec!["8", "tenant_b", "foo", "bar"], vec!["9", "tenant_c", "foo", "bar"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); let tenant_a_data: Vec>> = vec![ vec!["0", "tenant_a", "foo", "bar"], vec!["1", "tenant_a", "foo", "bar"], vec!["3", "tenant_a", "foo", "bar"], vec!["6", "tenant_a", "foo", "bar"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); let expected_continue_data: Vec>> = vec![ vec!["3", "tenant_a", "foo", "bar"], vec!["4", "tenant_b", "foo", "bar"], vec!["5", "tenant_b", "foo", "bar"], vec!["6", "tenant_a", "foo", "bar"], vec!["7", "tenant_b", "foo", "bar"], vec!["8", "tenant_b", "foo", "bar"], vec!["9", "tenant_c", "foo", "bar"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); let data = convert_to_readers(expected_data.clone(), tags).expect("failed to convert to readers"); // encapsulate the data let (mut reader, _meta) = session .encapsulate(columns, data, vec![], cfg) .expect("failed to encapsulate"); let mut capsule_data: Vec = Vec::new(); reader .read_to_end(&mut capsule_data) .expect("failed to read capsule data"); // reader seems to be linked to the session so if we don't drop it, we get // Err: cannot borrow `session` as mutable more than once at a time drop(reader); // Check that interleaving is working. We expect one capsule per tenant, then one for metadata let capsules = session .list_capsules(None, None, None, None, None, None, None) .unwrap(); assert_eq!(capsules.results.len(), 4); // open the sealed capsule let writer = Cursor::new(capsule_data.clone()); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(output_data, expected_data); // Next lets see if we can open the bundle as a subdomain let mut session_a = Session::new(id_a.clone(), key_a.clone()).expect("failed to create session"); // open the sealed capsule let writer = Cursor::new(capsule_data.clone()); let mut iterator = session_a .open("default", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(output_data, tenant_a_data); assert_eq!(iterator.open_failures().len(), 4); // Next let us try reading the bundle simulating an unreachable Cell with parent. // open the sealed capsule let writer = Cursor::new(capsule_data.clone()); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), writer) .expect("failed to open capsule"); // Swap out the configured Cell address. session .set_configuration_defaults(Some("http://127.0.0.1:1337".parse().unwrap()), None, None) .expect("failed to reset session"); session .set_use_direct_address(false) .expect("failed to set use direct address false"); match iterator.read_all(&[]) { Ok((_, _)) => panic!("expected API error"), Err(e) => {} } // Emulate restoring connectivity and check we can continue reads from the // next capsule. session .set_use_direct_address(true) .expect("failed to set use direct address true"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(output_data, expected_continue_data); // now let us remove the peering with tenant_b and confirm we cannot open // the capsule. In the future we would like to still open the capsule and // just skip over shards belonging to tenant_b remove_peering(&config, &domain_id.to_string(), &id_b.to_string()); let writer = Cursor::new(capsule_data); match session.open("default", HashMap::new(), HashMap::new(), writer) { Ok(_) => error!("open request should be rejected"), Err(_) => {} } } #[test] fn test_create_capsule_multirow() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id, api_key).expect("failed to create session"); let columns = vec![ Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }, Column { name: "col1".to_string(), tags: vec![], skip_classification: false, }, ]; let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; let mut file = File::create("/tmp/example_multirow.cap").expect("failed to create a file"); defer!({ fs::remove_file("/tmp/example_multirow.cap").expect("failed to remove file"); }); let mut cap = session .new_capsule(columns, vec![], cfg, &mut file) .expect("failed to create streaming capsule"); let mut input_data = vec![vec![ "row0, col0: example data element".as_bytes().to_vec(), "row0, col1: example data element".as_bytes().to_vec(), ]]; let cell_readers = readers_no_tags(input_data.clone()).expect("failed to generate data"); cap.add_rows(cell_readers).expect("failed to add rows"); let new_rows = vec![ vec![ "row1, col0: example data element".as_bytes().to_vec(), "row1, col1: example data element".as_bytes().to_vec(), ], vec![ "row2, col0: example data element".as_bytes().to_vec(), "row2, col1: example data element".as_bytes().to_vec(), ], vec![ "row3, col0: example data element".as_bytes().to_vec(), "row3, col1: example data element".as_bytes().to_vec(), ], ]; input_data.append(&mut new_rows.clone()); let cell_readers = readers_no_tags(new_rows).expect("failed to generate data"); cap.add_rows(cell_readers).expect("failed to add rows"); let new_rows = vec![ vec![ "row4, col0: example data element".as_bytes().to_vec(), "row4, col1: example data element".as_bytes().to_vec(), ], vec![ "row5, col0: example data element".as_bytes().to_vec(), "row5, col1: example data element".as_bytes().to_vec(), ], ]; input_data.append(&mut new_rows.clone()); let cell_readers = readers_no_tags(new_rows).expect("failed to generate data"); cap.add_rows(cell_readers).expect("failed to add rows"); cap.finalize().expect("failed to finalize"); drop(cap); file.flush().expect("failed to flush file"); // open the sealed capsule let file = File::open("/tmp/example_multirow.cap").expect("failed to open file"); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), file) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(input_data, output_data) } #[test] fn test_create_capsule_generate_subdomains_multirow() { let (domain_id, api_key, config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id.clone(), api_key.clone()).expect("failed to create session"); let (id_a, key_a) = create_subdomain(domain_id.clone(), api_key.clone(), "tenant_a") .expect("failed to create subdomain"); let (id_b, _) = create_subdomain(domain_id.clone(), api_key.clone(), "tenant_b") .expect("failed to create subdomain"); let (_id_c, _) = create_subdomain(domain_id.clone(), api_key.clone(), "tenant_c") .expect("failed to create subdomain"); let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: Some("tenant".to_string()), create_subdomains: None, async_seal: false, }; let columns = vec!["id", "tenant", "data1", "data2"] .into_iter() .map(|item| Column { name: item.to_string(), tags: vec![], skip_classification: false, }) .collect(); let mut file = File::create("/tmp/example_subdomains_multirow.cap").expect("failed to create a file"); defer!({ fs::remove_file("/tmp/example_subdomains_multirow.cap").expect("failed to remove file"); }); let mut cap = session .new_capsule(columns, vec![], cfg, &mut file) .expect("failed to create streaming capsule"); let mut input_data: Vec>> = vec![ vec!["0", "tenant_a", "foo", "bar"], vec!["1", "tenant_a", "foo", "bar"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); let cell_readers = readers_no_tags(input_data.clone()).expect("failed to generate data"); cap.add_rows(cell_readers).expect("failed to add rows"); let new_rows: Vec>> = vec![ vec!["2", "tenant_b", "foo", "bar"], vec!["3", "tenant_a", "foo", "bar"], vec!["4", "tenant_b", "foo", "bar"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); input_data.append(&mut new_rows.clone()); let cell_readers = readers_no_tags(new_rows).expect("failed to generate data"); cap.add_rows(cell_readers).expect("failed to add rows"); let new_rows: Vec>> = vec![ vec!["5", "tenant_b", "foo", "bar"], vec!["6", "tenant_a", "foo", "bar"], vec!["7", "tenant_b", "foo", "bar"], vec!["8", "tenant_b", "foo", "bar"], vec!["9", "tenant_c", "foo", "bar"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); input_data.append(&mut new_rows.clone()); let cell_readers = readers_no_tags(new_rows).expect("failed to generate data"); cap.add_rows(cell_readers).expect("failed to add rows"); cap.finalize().expect("failed to finalize"); drop(cap); // Check that interleaving is working. We expect one capsule per tenant, then one for metadata let capsules = session .list_capsules(None, None, None, None, None, None, None) .unwrap(); assert_eq!(capsules.results.len(), 4); // open the sealed capsule let file = File::open("/tmp/example_subdomains_multirow.cap").expect("failed to open file"); let mut iterator = session .open("default", HashMap::new(), HashMap::new(), file) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(output_data, input_data); // Check the error returned if we failed to open a bundle let file = File::open("/tmp/example_subdomains_multirow.cap").expect("failed to open file"); match session.open("unknown", HashMap::new(), HashMap::new(), file) { Ok(_) => panic!("expected an error, got a result"), Err(e) => assert_eq!( e.to_string().starts_with( "Error: failed to open capsule: APIError: open request failed (404 Not Found):" ), true ), } // Next lets see if we can open the bundle as a subdomain let mut session_a = Session::new(id_a.clone(), key_a.clone()).expect("failed to create session"); // open the sealed capsule but only for tenant_a let tenant_a_data: Vec>> = vec![ vec!["0", "tenant_a", "foo", "bar"], vec!["1", "tenant_a", "foo", "bar"], vec!["3", "tenant_a", "foo", "bar"], vec!["6", "tenant_a", "foo", "bar"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); let file = File::open("/tmp/example_subdomains_multirow.cap").expect("failed to openfile"); let mut iterator = session_a .open("default", HashMap::new(), HashMap::new(), file) .expect("failed to open capsule"); let (_tags, output_data) = iterator.read_all(&[]).expect("failed to read all"); assert_eq!(output_data, tenant_a_data); // there will be 5 failures as we process every add_rows each time its called. // So even though there are subsequent rows to the same tenant, they were added // on different 'add_rows' calls, so were encapsulated separately. We should // consider an optimisation of maybe only doing the encapsulate after a known // number of rows are given, but we have no knowledge on the size of the sells // which makes this complicated. assert_eq!(iterator.open_failures().len(), 5); // now let us remove the peering with tenant_b and confirm we cannot open // the capsule. In the future we would like to still open the capsule and // just skip over shards belonging to tenant_b remove_peering(&config, &domain_id.to_string(), &id_b.to_string()); let file = File::open("/tmp/example_subdomains_multirow.cap").expect("failed to open file"); match session.open("default", HashMap::new(), HashMap::new(), file) { Ok(_) => error!("open request should be rejected"), Err(_) => {} } } #[test] fn test_base_address() { let (domain_id, api_key, _config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let name = "company".to_string(); let mut conf = SessionConf { domain_id: domain_id.clone(), bearer_access_token: None, api_key: Some(api_key.clone()), read_cache_size: 0, engine_cache_size: 0, write_cache_size: 0, subdomain_cache_size: 0, buffered_seal: 0, buffered_seal_enabled: false, use_direct_address: false, current_base_path: None, act_for_domain: None, }; // Create a session that won't use the cell direct address if available let (mut session, _) = Session::from_config(conf.clone()).expect("failed to create session"); session .get_admin_url(&name, None, None) .expect("failed to construct an admin URL"); let serialised = session.to_serialized().expect("failed to serialise"); let indirect_conf: SessionConf = from_reader(&mut Cursor::new(&serialised)).unwrap(); conf.use_direct_address = true; // Create a session that will use the cell direct address if available let (mut session, _) = Session::from_config(conf).expect("failed to create session"); session .get_admin_url(&name, None, None) .expect("failed to construct an admin URL"); let serialised = session.to_serialized().expect("failed to serialise"); let direct_conf: SessionConf = from_reader(&mut Cursor::new(&serialised)).unwrap(); // validate that the new base addresses are different. One should be global and one should be cell let indirect_base_path = indirect_conf.current_base_path.expect("no base path"); let direct_base_path = direct_conf.current_base_path.expect("no base path"); assert_ne!(indirect_base_path, direct_base_path); } #[test] fn test_row_tags() { let (domain_id, api_key, _config) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let cfg = EncapsulateConfig { write_context_name: "default".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: None, create_subdomains: None, async_seal: false, }; let mut session = Session::new(domain_id.clone(), api_key.clone()).expect("failed to create session"); let mut columns = Vec::new(); let mut rng = rand::thread_rng(); let mut rows: Vec = Vec::new(); for i in 0..4 { columns.push(Column { name: format!("col {}", i), tags: vec![], skip_classification: false, }) } // add 5 rows to the table for row in 0..5 { let mut row_data = RowReader { cells: vec![], tags: vec![CapsuleTag { name: format!("row_tag_{}", row), tag_type: TagType::Unary, value: format!("{}", row), source: "".to_string(), hook_version: (0, 0, 0), }], }; for _ in 0..4 { // ass variance to the cell size so it may span multiple chunks. // We want to confirm we only get one row tag even if there are // multiple cells and chunks. let mut data = vec![0; rng.gen_range(10000..100000)]; rng.fill_bytes(&mut data); row_data.cells.push(CellReader { data: Box::new(Cursor::new(data)), tags: vec![], }); } rows.push(row_data) } let (_, meta) = session .encapsulate_to_bytes(columns, rows, vec![], cfg) .expect("failed to create capsule"); assert_eq!(meta.capsule_ids.len(), 1); let info = session .get_capsule_info(meta.capsule_ids[0].as_str()) .expect("failed to fetch info"); for (idx, tag) in info.capsule_tags.iter().enumerate() { assert_eq!(format!("row_tag_{}", idx), tag.name); } } fn create_domain(email: &str) -> Result<(String, String, Configuration), String> { let mut config = Configuration { base_path: format!("{}/{}", antimatter_api_url(), API_TARGET_VERSION), user_agent: None, client: antimatter::session::http_client::HTTPClient::new() .expect("failed to create HTTP client") .client(), basic_auth: None, oauth_access_token: None, bearer_access_token: Some("TODO".to_string()), api_key: None, }; let response = RUNTIME .block_on(general_api::domain_add_new( &config, NewDomain { admin_email: email.to_string(), google_jwt: None, display_name: None, }, )) .map_err(|e| format!("Failed to create domain: {}", e))?; let auth = RUNTIME .block_on(authentication_api::domain_authenticate( &config, response.id.clone().as_str(), DomainAuthenticate { token: response.api_key.clone(), }, None, None, None, )) .expect("failed to auth parent"); config.bearer_access_token = Some(auth.token); Ok((response.id, response.api_key, config)) } fn create_subdomain( parent: String, parent_key: String, nickname: &str, ) -> Result<(String, String), String> { let mut config = Configuration { base_path: format!("{}/{}", antimatter_api_url(), API_TARGET_VERSION), user_agent: None, client: antimatter::session::http_client::HTTPClient::new() .expect("failed to create HTTP client") .client(), basic_auth: None, oauth_access_token: None, bearer_access_token: Some("TODO".to_string()), api_key: None, }; let auth = RUNTIME .block_on(authentication_api::domain_authenticate( &config, parent.as_str(), DomainAuthenticate { token: parent_key }, None, None, None, )) .expect("failed to auth parent"); config.bearer_access_token = Some(auth.token); let req = CreatePeerDomain { nicknames: Some(vec![nickname.to_string()]), import_alias_for_parent: None, import_alias_for_child: nickname.to_string(), display_name_for_parent: None, display_name_for_child: nickname.to_string(), link_all: Some(true), link_identity_providers: None, link_facts: None, link_read_contexts: None, link_write_contexts: None, link_capabilities: None, link_domain_policy: None, link_root_encryption_keys: None, link_capsule_access_log: None, link_control_log: None, link_capsule_manifest: None, link_data_policy: None, }; let response = RUNTIME .block_on(general_api::domain_add_peer_domain( &config, parent.as_str(), req, )) .map_err(|e| format!("Failed to create peered domain: {}", e))?; Ok((response.id, response.api_key)) } fn readers_no_tags(elements: Vec>>) -> Result, String> { let mut tags: Vec>> = Vec::new(); for row in &elements { let mut tag_row: Vec> = Vec::new(); for _ in row { tag_row.push(vec![]); } tags.push(tag_row); } convert_to_readers(elements, tags) } fn convert_to_readers( elements: Vec>>, tags: Vec>>, ) -> Result, String> { if elements.is_empty() { return Ok(Vec::new()); } let col_count = elements[0].len(); // Validate column consistency. if elements.iter().any(|row| row.len() != col_count) { return Err("column length inconsistency".to_string()); } let rows = elements .clone() .into_iter() .zip(tags.into_iter()) .map(|(row, tags)| { let mapped = row .clone() .into_iter() .zip(tags.clone().into_iter()) .map(|(item_a, item_b)| to_data_element(item_a, item_b)) .collect::, String>>() .unwrap(); RowReader { tags: vec![], cells: mapped, } }) .collect::>(); Ok(rows) } fn to_data_element(element: Vec, tags: Vec) -> Result { CellReader::new(tags, std::io::Cursor::new(element.clone())) .map_err(|e| format!("failed to create reader for element: {}", e)) } fn add_write_ctx(config: &Configuration, domain: &String, write_ctx: &String) { let request_config = models::AddWriteContext { summary: "".to_string(), description: "".to_string(), config: Box::new(models::WriteContextConfigInfo { key_reuse_ttl: Some(0), default_capsule_tags: None, required_hooks: vec![models::WriteContextConfigInfoRequiredHooksInner { hook: "fast-pii".to_string(), constraint: ">1.0.0".to_string(), mode: Default::default(), }], }), }; RUNTIME .block_on(contexts_api::domain_upsert_write_context( &config, domain, write_ctx, request_config, )) .expect("failed to create write context"); } fn remove_peering(config: &Configuration, domain: &String, peer: &String) { RUNTIME .block_on(general_api::domain_delete_peer(config, peer, domain)) .expect("failed to delete peer"); RUNTIME .block_on(general_api::domain_delete_peer(config, domain, peer)) .expect("failed to delete peer"); } fn add_redaction_rule(session: &mut Session, read_ctx: &String) { let policy_id = session .create_data_policy(NewDataPolicy { name: "testpolicy1".to_string(), description: "test policy".to_string(), }) .expect("failed to create data policy") .policy_id; session .update_data_policy_rules( policy_id.as_str(), DataPolicyRuleChanges { delete_rules: None, new_rules: Some(vec![ NewDataPolicyRule { comment: None, clauses: vec![DataPolicyClause { operator: antimatter_api::models::data_policy_clause::Operator::AnyOf, capabilities: None, facts: None, read_parameters: None, tags: Some(vec![TagExpression { name: "tag.antimatter.io/pii/name".to_string(), values: None, operator: antimatter_api::models::tag_expression::Operator::Exists, variables: None, }]), }], effect: DataPolicyRuleEffect::Redact, token_scope: None, token_format: None, priority: Some(0), assign_priority: None, }, NewDataPolicyRule { comment: None, clauses: vec![DataPolicyClause { operator: antimatter_api::models::data_policy_clause::Operator::AnyOf, capabilities: None, facts: None, read_parameters: None, tags: Some(vec![TagExpression { name: "tag.antimatter.io/pii/sin".to_string(), values: None, operator: antimatter_api::models::tag_expression::Operator::Exists, variables: None, }]), }], effect: DataPolicyRuleEffect::Allow, token_scope: None, token_format: None, priority: Some(0), assign_priority: None, }, ]), }, ) .expect("failed to add data policy rule"); session.set_data_policy_binding( policy_id.as_str(), SetDataPolicyBinding{ read_contexts: Some(vec![SetDataPolicyBindingReadContextsInner{ name: read_ctx.to_string(), configuration: antimatter_api::models::set_data_policy_binding_read_contexts_inner::Configuration::Attached, }]), default_attachment: antimatter_api::models::set_data_policy_binding::DefaultAttachment::Attached, }, ).expect("failed to set data policy binding"); } fn add_read_ctx(config: &Configuration, domain: &str, read_ctx: &str) { let request_config = models::AddReadContext { summary: "example".to_string(), description: "example read context for testing".to_string(), disable_read_logging: None, key_cache_ttl: None, required_hooks: Some(vec![models::ReadContextRequiredHook { hook: "fast-pii".to_string(), constraint: ">1.0.0".to_string(), write_context: None, }]), read_parameters: None, }; RUNTIME .block_on(contexts_api::domain_upsert_read_context( &config, domain, read_ctx, request_config, )) .expect("failed to create read context"); } fn enable_disaster_recovery(config: &Configuration, domain: &str, secret_key: &str) { RUNTIME .block_on(general_api::domain_put_disaster_recovery_settings( &config, domain, models::DisasterRecoverySettings { enable: Some(true), public_key: Some(secret_key.to_string()), }, )) .expect("failed to enable disaster recovery"); } #[test] fn test_classify_and_redact() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id, api_key).expect("failed to create session"); session .classify_and_redact( vec![Column { name: "col0".to_string(), tags: vec![], skip_classification: false, }], vec![RowReader { cells: vec![ CellReader::new(vec![], std::io::Cursor::new("test".to_string())) .expect("failed to create CellReader"), ], tags: vec![], }], vec![], "default".to_string(), "default", HashMap::new(), ) .expect("classify_and_redact returned an error") .read_all(&[]) .expect("read_all returned an error"); } #[test] fn test_deny_record() { let (domain_id, api_key, _) = create_domain(ANTIMATTER_TEST_ADDRESS).expect("failed to create domain"); env::set_var("ANTIMATTER_API_URL", antimatter_api_url()); let mut session = Session::new(domain_id, api_key).expect("failed to create session"); session .add_read_context( "my_read_ctx", AddReadContext { summary: "sample read context".to_string(), description: "sample description".to_string(), required_hooks: Some(vec![ReadContextRequiredHook { hook: "fast-pii".to_string(), constraint: ">1.0.0".to_string(), write_context: None, }]), read_parameters: Some(vec![ReadContextParameter { key: Some("key".to_string()), required: Some(true), description: Some("description".to_string()), }]), key_cache_ttl: None, disable_read_logging: None, }, ) .expect("failed to create read context"); session .add_write_context( "my_write_ctx", AddWriteContext { summary: "sample write context".to_string(), description: "sample description".to_string(), config: Box::new(WriteContextConfigInfo { key_reuse_ttl: None, default_capsule_tags: None, required_hooks: vec![WriteContextConfigInfoRequiredHooksInner { hook: "fast-pii".to_string(), constraint: ">1.0.0".to_string(), mode: antimatter_api::models::write_context_config_info_required_hooks_inner::Mode::Sync, }], }), }, ) .expect("failed to create write context"); let data_policy = session .create_data_policy(NewDataPolicy { description: "sample description".to_string(), name: "my_data_policy".to_string(), }) .expect("failed to create data policy"); session .update_data_policy_rules( &data_policy.policy_id, DataPolicyRuleChanges { delete_rules: None, new_rules: Some(vec![NewDataPolicyRule { comment: Some("deny".to_string()), effect: DataPolicyRuleEffect::DenyRecord, token_scope: None, token_format: None, priority: Some(10), assign_priority: None, clauses: vec![DataPolicyClause { operator: antimatter_api::models::data_policy_clause::Operator::AnyOf, capabilities: None, facts: None, read_parameters: None, tags: Some(vec![TagExpression { name: "tag.antimatter.io/pii/name".to_string(), values: None, operator: antimatter_api::models::tag_expression::Operator::Exists, variables: None, }]), }], }]), }, ) .expect("failed to update data policy rules"); session .set_data_policy_binding( &data_policy.policy_id, SetDataPolicyBinding { read_contexts: None, default_attachment: DefaultAttachment::Attached, }, ) .expect("failed to bind data policy"); //encapsulate, open, read_all let cfg = EncapsulateConfig { write_context_name: "my_write_ctx".to_string(), extra: "some extra data".to_string(), subdomain: None, subdomain_from: Some("tenant".to_string()), create_subdomains: Some(true), async_seal: false, }; let columns = vec!["tenant", "age", "name"] .into_iter() .map(|item| Column { name: item.to_string(), tags: vec![], skip_classification: false, }) .collect(); let tags = vec![ vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], vec![vec![], vec![], vec![]], ]; let input: Vec>> = vec![ vec!["smiths", "22", "Adam Smith"], vec!["smiths", "11", "Bobby Smith"], vec!["smiths", "42", "Haley Smith"], vec!["cooks", "55", "Captain Cook"], vec!["hues", "32", "Amber Hue"], vec!["hues", "30", "Steven Hue"], vec!["grangers", "12", "Harry Granger"], vec!["grangers", "28", "Kim Granger"], vec!["grangers", "30", "Jess Granger"], ] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect() }) .collect(); let data = convert_to_readers(input, tags).expect("failed to convert to readers"); // encapsulate the data let (mut data, _meta) = session .encapsulate_to_bytes(columns, data, vec![], cfg) .expect("failed to encapsulate"); let mut reader = session .open( "my_read_ctx", HashMap::new(), HashMap::new(), std::io::Cursor::new(data), ) .expect("failed to open"); let (_, data) = reader.read_all(&[]).expect("failed to read_all"); assert_eq!( data, vec![vec!["cooks", "55", "Captain Cook"]] .into_iter() .map(|row| { row.into_iter() .map(|item| item.as_bytes().to_vec()) .collect::>() }) .collect::>() ); }