use crate::{ client::AmqpConnection, instantiate_mcai_worker_description, instantiate_mcai_worker_tests, }; use mcai_worker_sdk::prelude::*; use mockito::mock; use serde::Deserialize; use serial_test::serial; use std::sync::{ mpsc::{self, Sender}, Arc, Mutex, }; use std::time::Duration; #[async_std::test] #[serial] async fn update_job() -> Result<()> { let file_path = "./test_rabbitmq_media_processor_update_job.mxf"; let nb_frames = 500; super::ffmpeg::create_xdcam_sample_file(file_path, nb_frames).unwrap(); struct Worker { updatable: String, } #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters { #[allow(dead_code)] source_path: String, #[allow(dead_code)] destination_path: String, #[allow(dead_code)] updatable: String, } instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Update processor test worker!"); Ok(()) } fn init_process( &mut self, parameters: WorkerParameters, format_context: Arc>, _result: Arc>>, ) -> Result> { self.updatable = parameters.updatable; let mut stream_descriptors = vec![]; let format_context = format_context.lock().unwrap(); for stream_index in 0..format_context.get_nb_streams() { let stream_type = format_context.get_stream_type(stream_index as isize); info!( "Handle stream #{} with type: {:?}", stream_index, stream_type ); match stream_type { AVMediaType::AVMEDIA_TYPE_VIDEO => { let filters = vec![VideoFilter::Resize(Scaling { width: Some(200), height: Some(70), })]; stream_descriptors.push(StreamDescriptor::new_video(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_AUDIO => { let channel_layouts = vec!["mono".to_string()]; let sample_formats = vec!["s16".to_string()]; let sample_rates = vec![16000]; let filters = vec![AudioFilter::Format(AudioFormat { sample_rates, channel_layouts, sample_formats, })]; stream_descriptors.push(StreamDescriptor::new_audio(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_SUBTITLE => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } AVMediaType::AVMEDIA_TYPE_DATA => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } _ => info!("Skip stream #{}", stream_index), }; } Ok(stream_descriptors) } fn process_frames( &mut self, _job_result: JobResult, _stream_index: usize, _frames: &[ProcessFrame], ) -> Result { log::info!("Process batch"); Ok(ProcessResult::new_json("")) } fn update_process(&mut self, parameters: WorkerParameters) -> Result<()> { self.updatable = parameters.updatable; Ok(()) } } let _m = mock("POST", "/sessions") .with_header("content-type", "application/json") .with_body(r#"{"access_token": "fake_access_token"}"#) .create(); let _m = mock("GET", "/step_flow") .with_header("content-type", "application/json") .with_body(r#"{"application": "step_flow_test", "version": "1.7.0"}"#) .create(); let _m = mock("GET", "/step_flow/blacklist") .with_header("content-type", "application/json") .with_body(r#"{"data": [], "total": 0}"#) .create(); std::env::set_var("BACKEND_HOSTNAME", mockito::server_url()); let receiver_timeout = Duration::from_secs(30); let (created_sender, created_receiver) = mpsc::channel::(); let (status_sender, status_receiver) = mpsc::channel::(); let (initialized_sender, initialized_receiver) = mpsc::channel::(); let (started_sender, started_receiver) = mpsc::channel::(); let (updated_sender, updated_receiver) = mpsc::channel::(); let (progression_sender, progression_receiver) = mpsc::channel::(); let (stopped_sender, stopped_receiver) = mpsc::channel::(); let amqp_connection = AmqpConnection::new().unwrap(); amqp_connection.start_consumer(QUEUE_WORKER_CREATED, created_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_STATUS, status_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_INITIALIZED, initialized_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_STARTED, started_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_UPDATED, updated_sender, 1); amqp_connection.start_consumer(QUEUE_JOB_PROGRESSION, progression_sender, 1); amqp_connection.start_consumer(QUEUE_JOB_STOPPED, stopped_sender, 1); let instance_id = "update_job_9876543210"; let worker = Worker { updatable: "".to_string(), }; let worker_configuration = WorkerConfiguration::new("", &worker, instance_id).unwrap(); let rabbitmq_exchange = RabbitmqExchange::new(&worker_configuration).await; if let Err(MessageError::Amqp(lapin::Error::IOError(error))) = rabbitmq_exchange { eprintln!("Connection to RabbitMQ failure: {error}. Skip test."); return Ok(()); } let rabbitmq_exchange = Arc::new(Mutex::new(rabbitmq_exchange.unwrap())); let worker = Arc::new(Mutex::new(worker)); let cl_worker = Arc::clone(&worker); std::thread::spawn(move || { let processor = Processor::new(rabbitmq_exchange, worker_configuration); assert!(processor.run(cl_worker, Arc::new(Mutex::new(None))).is_ok()); }); assert!(created_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::Status)?; assert!(status_receiver.recv_timeout(receiver_timeout).is_ok()); let job = Job::new( r#"{ "job_id": 999, "parameters": [ { "id": "source_path", "type": "string", "value": "./test_rabbitmq_media_processor_update_job.mxf" }, { "id": "destination_path", "type": "string", "value": "./test_rabbitmq_media_processor_update_job.json" }, { "id": "updatable", "type": "string", "value": "basic" } ] }"#, ) .unwrap(); amqp_connection.send_order(vec![instance_id], &OrderMessage::InitProcess(job.clone()))?; assert!(initialized_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StartProcess(job.clone()))?; assert!(started_receiver.recv_timeout(receiver_timeout).is_ok()); assert!(progression_receiver.recv_timeout(receiver_timeout).is_ok()); let updated_job = Job::new( r#"{ "job_id": 999, "parameters": [ { "id": "source_path", "type": "string", "value": "./test_rabbitmq_media_processor_update_job.mxf" }, { "id": "destination_path", "type": "string", "value": "./test_rabbitmq_media_processor_update_job.json" }, { "id": "updatable", "type": "string", "value": "updated" } ] }"#, ) .unwrap(); amqp_connection.send_order(vec![instance_id], &OrderMessage::UpdateProcess(updated_job))?; let updated_message = updated_receiver.recv_timeout(receiver_timeout); assert!(updated_message.is_ok()); let updated_param: String = worker.lock().unwrap().updatable.clone(); assert_eq!(updated_param, "updated".to_string()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StopProcess(job))?; let stopped_message = stopped_receiver.recv_timeout(receiver_timeout); assert!(stopped_message.is_ok()); std::thread::sleep(std::time::Duration::from_millis(2000)); log::info!("RabbitMQ update test done!"); std::fs::remove_file(file_path).unwrap(); Ok(()) }