use crate::{instantiate_mcai_worker_description, instantiate_mcai_worker_tests}; use assert_matches::assert_matches; use mcai_worker_sdk::prelude::*; use serde::Deserialize; #[test] fn processor() { struct Worker {} #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters {} instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Initialize processor test worker!"); Ok(()) } fn process( &self, channel: Option, _parameters: WorkerParameters, job_result: JobResult, ) -> Result where Self: std::marker::Sized, { assert!(channel.is_some()); Ok(job_result.with_message("OK")) } } let (internal_local_exchange, mut external_local_exchange) = LocalExchange::create(); let internal_local_exchange = Arc::new(Mutex::new(internal_local_exchange)); let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("", &worker, "instance_id").unwrap(); let worker = Arc::new(Mutex::new(worker)); std::thread::spawn(move || { let mut processor = Processor::new(internal_local_exchange, worker_configuration); processor.disable_ctrl_c_handler(); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); // Check if the worker is created successfully assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerCreated(_))) ); external_local_exchange .send_order(OrderMessage::BlackList(BlackList::default())) .unwrap(); let job = Job::new(include_str!("../jobs/simple.json")).unwrap(); external_local_exchange .send_order(OrderMessage::Job(job)) .unwrap(); // let response = local_exchange.next_response().unwrap(); // assert_matches!(response.unwrap(), ResponseMessage::WorkerInitialized(_)); // // let response = local_exchange.next_response().unwrap(); // assert_matches!(response.unwrap(), ResponseMessage::WorkerStarted(_)); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback(Feedback::Progression( JobProgression { job_id: 666, progression: 0, .. } )))) ); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Completed(_))) ); external_local_exchange .send_order(OrderMessage::StopWorker) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback(Feedback::Status( ProcessStatus { job: None, worker: WorkerStatus { activity: WorkerActivity::Idle, system_info: SystemInformation { .. }, .. } } )))) ); }