use crate::{instantiate_mcai_worker_description, instantiate_mcai_worker_tests}; use assert_matches::assert_matches; use mcai_worker_sdk::prelude::*; use serde::Deserialize; use std::collections::HashMap; #[test] pub fn test_retrieving_vault_credentials() { const VAULT_TOKEN: &str = "VAULT_TOKEN"; if let Err(_error) = std::env::var(VAULT_TOKEN) { eprintln!("{} environment variable not set. Skip test.", VAULT_TOKEN); return; } struct Worker {} #[derive(Clone, Debug, Deserialize)] struct VaultCredentials { data: HashMap, } #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters { credentials: HashMap, } instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Initialize Vault credentials test worker!"); Ok(()) } fn process( &self, _channel: Option, parameters: WorkerParameters, job_result: JobResult, ) -> Result where Self: std::marker::Sized, { // Read expected credentials from file let file = std::fs::File::open("./tests/functional/vault_credentials.json").unwrap(); let expected_credentials: VaultCredentials = serde_json::from_reader(file).unwrap(); for (expected_key, expected_value) in expected_credentials.data.iter() { let value = parameters.credentials.get(expected_key).ok_or_else(|| { MessageError::RuntimeError(format!( "Could not find expected credential: '{}'", expected_key )) })?; log::debug!("Found credentials: {}={}", expected_key, value); if value != expected_value { return Err(MessageError::RuntimeError(format!( "Invalid credential value: '{}' instead of '{}'", value, expected_value ))); } } Ok(job_result) } } let message = r#"{ "job_id": 666, "parameters": [ { "id": "credentials", "type": "string", "store": "VAULT", "value": "test" } ] }"#; let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("", &worker, "instance_id").unwrap(); let worker = Arc::new(Mutex::new(worker)); let (internal_local_exchange, external_local_exchange) = LocalExchange::create(); let internal_local_exchange = Arc::new(Mutex::new(internal_local_exchange)); let external_local_exchange = Arc::new(Mutex::new(external_local_exchange)); std::thread::spawn(move || { let processor = Processor::new(internal_local_exchange, worker_configuration); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); fn next_message(external_local_exchange: Arc>) -> ResponseMessage { loop { std::thread::sleep(std::time::Duration::from_millis(20)); if let Ok(ref mut external_local_exchange) = external_local_exchange.try_lock() { return external_local_exchange.next_response().unwrap().unwrap(); } } } fn send_message( external_local_exchange: Arc>, message: OrderMessage, ) { loop { if let Ok(ref mut external_local_exchange) = external_local_exchange.try_lock() { external_local_exchange.send_order(message).unwrap(); return; } std::thread::sleep(std::time::Duration::from_millis(20)); } } assert_matches!( next_message(external_local_exchange.clone()), ResponseMessage::WorkerCreated(_) ); let job = Job::new(message).unwrap(); send_message(external_local_exchange.clone(), OrderMessage::Job(job)); assert_matches!( next_message(external_local_exchange.clone()), ResponseMessage::Feedback(Feedback::Progression(JobProgression { job_id: 666, progression: 0, .. })) ); assert_matches!( next_message(external_local_exchange.clone()), ResponseMessage::Completed(JobResult { .. }) ); }