use crate::{ client::AmqpConnection, instantiate_mcai_worker_description, instantiate_mcai_worker_tests, }; use mcai_worker_sdk::prelude::*; use mockito::mock; use serde::Deserialize; use std::{sync::mpsc, time::Duration}; #[async_std::test] async fn rabbitmq_stop_job() -> Result<()> { struct Worker {} #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters {} instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Initialize processor test worker!"); Ok(()) } fn process( &self, channel: Option, _parameters: WorkerParameters, job_result: JobResult, ) -> Result where Self: std::marker::Sized, { assert!(channel.is_some()); loop { std::thread::sleep(std::time::Duration::from_secs(1)); if Self::is_current_job_stopped(&channel) { log::warn!("Worker is stopped"); return Ok(job_result.with_status(JobStatus::Stopped)); } } } } let _m = mock("POST", "/sessions") .with_header("content-type", "application/json") .with_body(r#"{"access_token": "fake_access_token"}"#) .create(); let _m = mock("GET", "/step_flow") .with_header("content-type", "application/json") .with_body(r#"{"application": "step_flow_test", "version": "1.7.0"}"#) .create(); let _m = mock("GET", "/step_flow/blacklist") .with_header("content-type", "application/json") .with_body(r#"{"data": [], "total": 0}"#) .create(); std::env::set_var("BACKEND_HOSTNAME", mockito::server_url()); let receiver_timeout = Duration::from_secs(30); let (created_sender, created_receiver) = mpsc::channel::(); let (status_sender, status_receiver) = mpsc::channel::(); let (initialized_sender, initialized_receiver) = mpsc::channel::(); let (started_sender, started_receiver) = mpsc::channel::(); let (progression_sender, progression_receiver) = mpsc::channel::(); let (stopped_sender, stopped_receiver) = mpsc::channel::(); let amqp_connection = AmqpConnection::new().unwrap(); amqp_connection.start_consumer(QUEUE_WORKER_CREATED, created_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_STATUS, status_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_INITIALIZED, initialized_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_STARTED, started_sender, 1); amqp_connection.start_consumer(QUEUE_JOB_PROGRESSION, progression_sender, 1); amqp_connection.start_consumer(QUEUE_JOB_STOPPED, stopped_sender, 1); let instance_id = "9876543210"; let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("", &worker, instance_id).unwrap(); let rabbitmq_exchange = RabbitmqExchange::new(&worker_configuration).await; if let Err(MessageError::Amqp(lapin::Error::IOError(error))) = rabbitmq_exchange { eprintln!( "Connection to RabbitMQ failure: {}. Skip test.", error.to_string() ); return Ok(()); } let rabbitmq_exchange = Arc::new(Mutex::new(rabbitmq_exchange.unwrap())); let cloned_worker_configuration = worker_configuration.clone(); let worker = Arc::new(Mutex::new(worker)); std::thread::spawn(move || { let processor = Processor::new(rabbitmq_exchange, cloned_worker_configuration); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); assert!(created_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::Status)?; assert!(status_receiver.recv_timeout(receiver_timeout).is_ok()); let job = Job::new(r#"{ "job_id": 666, "parameters": [] }"#).unwrap(); amqp_connection.send_order(vec![instance_id], &OrderMessage::InitProcess(job.clone()))?; assert!(initialized_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StartProcess(job.clone()))?; assert!(started_receiver.recv_timeout(receiver_timeout).is_ok()); assert!(progression_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StopProcess(job))?; assert!(stopped_receiver.recv_timeout(receiver_timeout).is_ok()); Ok(()) }