use crate::{instantiate_mcai_worker_description, instantiate_mcai_worker_tests}; use assert_matches::assert_matches; use mcai_worker_sdk::prelude::*; use serde::Deserialize; #[test] fn processor() { struct Worker {} #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters {} instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { println!("Initialize processor test worker!"); Ok(()) } fn process( &self, channel: Option, _parameters: WorkerParameters, job_result: JobResult, ) -> Result where Self: std::marker::Sized, { assert!(channel.is_some()); loop { std::thread::sleep(std::time::Duration::from_millis(100)); if Self::is_current_job_stopped(&channel) { return Ok(job_result.with_status(JobStatus::Stopped)); } } } } let (internal_local_exchange, mut external_local_exchange) = LocalExchange::create(); let internal_local_exchange = Arc::new(Mutex::new(internal_local_exchange)); let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("", &worker, "instance_id").unwrap(); let worker = Arc::new(Mutex::new(worker)); std::thread::spawn(move || { let processor = Processor::new(internal_local_exchange, worker_configuration); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); // Check if the worker is created successfully assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerCreated(_))) ); external_local_exchange .send_order(OrderMessage::BlackList(BlackList::default())) .unwrap(); let job = Job::new(include_str!("../jobs/simple.json")).unwrap(); external_local_exchange .send_order(OrderMessage::InitProcess(job.clone())) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerInitialized(_))) ); external_local_exchange .send_order(OrderMessage::StartProcess(job.clone())) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerStarted(_))) ); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback( Feedback::Progression { .. } ))) ); external_local_exchange .send_order(OrderMessage::StopProcess(job)) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::JobStopped(_))) ); external_local_exchange .send_order(OrderMessage::StopWorker) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback(Feedback::Status { .. }))) ); }