use crate::{ client::AmqpConnection, instantiate_mcai_worker_description, instantiate_mcai_worker_tests, }; use mcai_worker_sdk::prelude::*; use mockito::mock; use serde::Deserialize; use serial_test::serial; use std::{ sync::{ mpsc::{self, Sender}, Arc, Mutex, }, time::Duration, }; #[async_std::test] #[serial] async fn stop_job() -> Result<()> { let file_path = "./test_rabbitmq_media_processor_stop_job.mxf"; let nb_frames = 500; super::ffmpeg::create_xdcam_sample_file(file_path, nb_frames).unwrap(); struct Worker {} #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters { #[allow(dead_code)] source_path: String, #[allow(dead_code)] destination_path: String, } instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Initialize processor test worker!"); Ok(()) } fn init_process( &mut self, _parameters: WorkerParameters, format_context: Arc>, _result: Arc>>, ) -> Result> { let mut stream_descriptors = vec![]; let format_context = format_context.lock().unwrap(); for stream_index in 0..format_context.get_nb_streams() { let stream_type = format_context.get_stream_type(stream_index as isize); info!( "Handle stream #{} with type: {:?}", stream_index, stream_type ); match stream_type { AVMediaType::AVMEDIA_TYPE_VIDEO => { let filters = vec![VideoFilter::Resize(Scaling { width: Some(200), height: Some(70), })]; stream_descriptors.push(StreamDescriptor::new_video(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_AUDIO => { let channel_layouts = vec!["mono".to_string()]; let sample_formats = vec!["s16".to_string()]; let sample_rates = vec![16000]; let filters = vec![AudioFilter::Format(AudioFormat { sample_rates, channel_layouts, sample_formats, })]; stream_descriptors.push(StreamDescriptor::new_audio(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_SUBTITLE => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } AVMediaType::AVMEDIA_TYPE_DATA => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } _ => info!("Skip stream #{}", stream_index), }; } Ok(stream_descriptors) } fn process_frames( &mut self, _job_result: JobResult, _stream_index: usize, _frames: &[ProcessFrame], ) -> Result { log::info!("Process batch"); Ok(ProcessResult::new_json("")) } } let _m = mock("POST", "/sessions") .with_header("content-type", "application/json") .with_body(r#"{"access_token": "fake_access_token"}"#) .create(); let _m = mock("GET", "/step_flow") .with_header("content-type", "application/json") .with_body(r#"{"application": "step_flow_test", "version": "1.7.0"}"#) .create(); let _m = mock("GET", "/step_flow/blacklist") .with_header("content-type", "application/json") .with_body(r#"{"data": [], "total": 0}"#) .create(); std::env::set_var("BACKEND_HOSTNAME", mockito::server_url()); let receiver_timeout = Duration::from_secs(60); let (created_sender, created_receiver) = mpsc::channel::(); let (status_sender, status_receiver) = mpsc::channel::(); let (initialized_sender, initialized_receiver) = mpsc::channel::(); let (started_sender, started_receiver) = mpsc::channel::(); let (progression_sender, progression_receiver) = mpsc::channel::(); let (stopped_sender, stopped_receiver) = mpsc::channel::(); let amqp_connection = AmqpConnection::new().unwrap(); amqp_connection.start_consumer(QUEUE_WORKER_CREATED, created_sender, 1); amqp_connection.start_consumer(QUEUE_WORKER_STATUS, status_sender, 2); amqp_connection.start_consumer(QUEUE_WORKER_INITIALIZED, initialized_sender, 2); amqp_connection.start_consumer(QUEUE_WORKER_STARTED, started_sender, 2); amqp_connection.start_consumer(QUEUE_JOB_PROGRESSION, progression_sender, 2); amqp_connection.start_consumer(QUEUE_JOB_STOPPED, stopped_sender, 2); let instance_id = "stop_job_9876543210"; let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("", &worker, instance_id).unwrap(); let rabbitmq_exchange = RabbitmqExchange::new(&worker_configuration).await; if let Err(MessageError::Amqp(lapin::Error::IOError(error))) = rabbitmq_exchange { eprintln!("Connection to RabbitMQ failure: {error}. Skip test."); return Ok(()); } let rabbitmq_exchange = Arc::new(Mutex::new(rabbitmq_exchange.unwrap())); let worker = Arc::new(Mutex::new(worker)); std::thread::spawn(move || { let mut processor = Processor::new(rabbitmq_exchange, worker_configuration); processor.disable_ctrl_c_handler(); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); assert!(created_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::Status)?; assert!(status_receiver.recv_timeout(receiver_timeout).is_ok()); let job = Job::new( r#"{ "job_id": 999, "parameters": [ { "id": "source_path", "type": "string", "value": "./test_rabbitmq_media_processor_stop_job.mxf" }, { "id": "destination_path", "type": "string", "value": "./test_rabbitmq_media_processor_stop_job.json" } ] }"#, ) .unwrap(); amqp_connection.send_order(vec![instance_id], &OrderMessage::InitProcess(job.clone()))?; assert!(initialized_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StartProcess(job.clone()))?; assert!(started_receiver.recv_timeout(receiver_timeout).is_ok()); assert!(progression_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StopProcess(job.clone()))?; let stopped_message = stopped_receiver.recv_timeout(receiver_timeout); assert!(stopped_message.is_ok()); std::thread::sleep(std::time::Duration::from_millis(2000)); log::info!("Get the status of the worker"); amqp_connection.send_order(vec![instance_id], &OrderMessage::Status)?; assert!(status_receiver.recv_timeout(receiver_timeout).is_ok()); log::info!("Second time same job"); amqp_connection.send_order(vec![instance_id], &OrderMessage::InitProcess(job.clone()))?; assert!(initialized_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StartProcess(job.clone()))?; assert!(started_receiver.recv_timeout(receiver_timeout).is_ok()); assert!(progression_receiver.recv_timeout(receiver_timeout).is_ok()); amqp_connection.send_order(vec![instance_id], &OrderMessage::StopProcess(job))?; let stopped_message = stopped_receiver.recv_timeout(receiver_timeout); assert!(stopped_message.is_ok()); std::thread::sleep(std::time::Duration::from_millis(2000)); log::info!("RabbitMQ stop job test done!"); std::fs::remove_file(file_path).unwrap(); Ok(()) }