use crate::{instantiate_mcai_worker_description, instantiate_mcai_worker_tests}; use assert_matches::assert_matches; use mcai_worker_sdk::prelude::*; use serde::Deserialize; use std::sync::{mpsc::Sender, Arc, Mutex}; #[test] fn processor() { let file_path = "./test_media_processor.mxf"; let nb_frames = 5; super::ffmpeg::create_xdcam_sample_file(file_path, nb_frames).unwrap(); struct Worker {} #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters { #[allow(dead_code)] source_path: String, #[allow(dead_code)] destination_path: String, } instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Initialize processor test worker!"); Ok(()) } fn init_process( &mut self, _parameters: WorkerParameters, format_context: Arc>, _result: Arc>>, ) -> Result> { let mut stream_descriptors = vec![]; let format_context = format_context.lock().unwrap(); for stream_index in 0..format_context.get_nb_streams() { let stream_type = format_context.get_stream_type(stream_index as isize); info!( "Handle stream #{} with type: {:?}", stream_index, stream_type ); match stream_type { AVMediaType::AVMEDIA_TYPE_VIDEO => { let filters = vec![VideoFilter::Resize(Scaling { width: Some(200), height: Some(70), })]; stream_descriptors.push(StreamDescriptor::new_video(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_AUDIO => { let channel_layouts = vec!["mono".to_string()]; let sample_formats = vec!["s16".to_string()]; let sample_rates = vec![16000]; let filters = vec![AudioFilter::Format(AudioFormat { sample_rates, channel_layouts, sample_formats, })]; stream_descriptors.push(StreamDescriptor::new_audio(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_SUBTITLE => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } AVMediaType::AVMEDIA_TYPE_DATA => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } _ => info!("Skip stream #{}", stream_index), }; } Ok(stream_descriptors) } fn process_frames( &mut self, _job_result: JobResult, _stream_index: usize, _frames: &[ProcessFrame], ) -> Result { log::info!("Process frames"); Ok(ProcessResult::new_json("")) } } let (internal_local_exchange, mut external_local_exchange) = LocalExchange::create(); let internal_local_exchange = Arc::new(Mutex::new(internal_local_exchange)); let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("", &worker, "instance_id").unwrap(); let worker = Arc::new(Mutex::new(worker)); std::thread::spawn(move || { let mut processor = Processor::new(internal_local_exchange, worker_configuration); processor.disable_ctrl_c_handler(); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); // Check if the worker is created successfully assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerCreated(_))) ); let job = Job::new(include_str!("../jobs/simple_media.json")).unwrap(); external_local_exchange .send_order(OrderMessage::InitProcess(job.clone())) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerInitialized(_))) ); external_local_exchange .send_order(OrderMessage::StartProcess(job.clone())) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerStarted(JobResult { .. }))) ); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback( Feedback::Progression { .. } ))) ); external_local_exchange .send_order(OrderMessage::Status) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback(Feedback::Status { .. }))) ); for index in 1..5 { let response = external_local_exchange.next_response(); println!("{index:?}"); assert_matches!( response, Ok(Some(ResponseMessage::Feedback( Feedback::Progression { .. } ))) ); } assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Completed(_))) ); // Second job, stop during execution external_local_exchange .send_order(OrderMessage::InitProcess(job.clone())) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerInitialized(_))) ); external_local_exchange .send_order(OrderMessage::StartProcess(job.clone())) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerStarted(JobResult { .. }))) ); external_local_exchange .send_order(OrderMessage::StopProcess(job)) .unwrap(); loop { let response = external_local_exchange.next_response(); match response { Ok(Some(ResponseMessage::Feedback(Feedback::Progression { .. }))) => {} Ok(Some(ResponseMessage::JobStopped(JobResult { .. }))) => { break; } _ => panic!(), } } external_local_exchange .send_order(OrderMessage::StopWorker) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerTerminated(_))) ); std::fs::remove_file(file_path).unwrap(); }