use crate::{ client::AmqpConnection, instantiate_mcai_worker_description, instantiate_mcai_worker_tests, }; use lapin::types::{AMQPValue, FieldTable}; use mcai_worker_sdk::prelude::*; use mockito::mock; use serde::Deserialize; use std::{collections::BTreeMap, sync::mpsc, time::Duration}; #[async_std::test] async fn rabbitmq_ttl_job() -> Result<()> { // Should not be called after the logger is already initialized // env_logger::init(); struct Worker {} #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters {} instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Initialize processor test worker!"); Ok(()) } fn process( &self, channel: Option, _parameters: WorkerParameters, job_result: JobResult, ) -> Result where Self: std::marker::Sized, { assert!(channel.is_some()); return Ok(job_result.with_status(JobStatus::Completed)); } } let _m = mock("POST", "/sessions") .with_header("content-type", "application/json") .with_body(r#"{"access_token": "fake_access_token"}"#) .create(); let _m = mock("GET", "/step_flow") .with_header("content-type", "application/json") .with_body(r#"{"application": "step_flow_test", "version": "1.7.0"}"#) .create(); let _m = mock("GET", "/step_flow/blacklist") .with_header("content-type", "application/json") .with_body(r#"{"data": [], "total": 0}"#) .create(); std::env::set_var("BACKEND_HOSTNAME", mockito::server_url()); let receiver_timeout = Duration::from_secs(30); let (created_sender, created_receiver) = mpsc::channel::(); let (status_sender, status_receiver) = mpsc::channel::(); let (error_sender, error_receiver) = mpsc::channel::(); let amqp_connection = AmqpConnection::new().unwrap(); amqp_connection.start_consumer(QUEUE_WORKER_CREATED, created_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_STATUS, status_sender, 1); amqp_connection.start_consumer(QUEUE_JOB_ERROR, error_sender, 3); let instance_id = "908126354"; let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("job_test", &worker, instance_id).unwrap(); let rabbitmq_exchange = RabbitmqExchange::new(&worker_configuration).await; if let Err(MessageError::Amqp(lapin::Error::IOError(error))) = rabbitmq_exchange { eprintln!( "Connection to RabbitMQ failure: {}. Skip test.", error.to_string() ); return Ok(()); } let rabbitmq_exchange = Arc::new(Mutex::new(rabbitmq_exchange.unwrap())); let cloned_worker_configuration = worker_configuration.clone(); let worker = Arc::new(Mutex::new(worker)); std::thread::spawn(move || { let processor = Processor::new(rabbitmq_exchange, cloned_worker_configuration); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); assert!(created_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::Status)?; assert!(status_receiver.recv_timeout(receiver_timeout).is_ok()); let job = Job::new( r#"{ "job_id": 123, "parameters": [ { "id":"requirements", "type":"requirements", "value": { "paths": [ "nonexistent_file" ] } } ] }"#, ) .unwrap(); amqp_connection.send_order_to_queue( "job_test", FieldTable::default(), &OrderMessage::ReachedExpiration(job.clone()), )?; assert!(error_receiver.recv_timeout(receiver_timeout).is_ok()); let mut map = FieldTable::from(BTreeMap::new()); let mut properties = FieldTable::from(BTreeMap::new()); properties.insert("count".into(), AMQPValue::LongLongInt(5)); map.insert( "x-death".into(), AMQPValue::FieldArray(vec![AMQPValue::FieldTable(properties)].into()), ); amqp_connection.send_order_to_queue( "job_test", map.clone(), &OrderMessage::StartProcess(job.clone()), )?; assert!(error_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order_to_queue( "job_test", FieldTable::default(), &OrderMessage::Job(job.clone()), )?; assert!(error_receiver.recv_timeout(Duration::from_secs(60)).is_ok()); Ok(()) }