use crate::{instantiate_mcai_worker_description, instantiate_mcai_worker_tests}; use assert_matches::assert_matches; use mcai_worker_sdk::prelude::*; use serde::Deserialize; use std::sync::{mpsc::Sender, Arc, Mutex}; #[test] pub fn batch_frame_processing() { let file_path = "./test_batch_media_processor.mxf"; let nb_frames = 7; super::ffmpeg::create_xdcam_sample_file(file_path, nb_frames).unwrap(); static mut COUNTER_BATCH: i64 = 0; struct Worker {} #[derive(Clone, Debug, Deserialize, JsonSchema)] pub struct WorkerParameters { #[allow(dead_code)] source_path: String, #[allow(dead_code)] destination_path: String, } instantiate_mcai_worker_tests!(); impl McaiWorker for Worker { instantiate_mcai_worker_description!(); fn init(&mut self) -> Result<()> { log::info!("Initialize processor test worker!"); Ok(()) } fn init_process( &mut self, _parameters: WorkerParameters, format_context: Arc>, _result: Arc>>, ) -> Result> { let mut stream_descriptors = vec![]; let format_context = format_context.lock().unwrap(); for stream_index in 0..format_context.get_nb_streams() { let stream_type = format_context.get_stream_type(stream_index as isize); info!( "Handle stream #{} with type: {:?}", stream_index, stream_type ); match stream_type { AVMediaType::AVMEDIA_TYPE_VIDEO => { let filters = vec![VideoFilter::Resize(Scaling { width: Some(200), height: Some(70), })]; stream_descriptors.push(StreamDescriptor::new_video(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_AUDIO => { let channel_layouts = vec!["mono".to_string()]; let sample_formats = vec!["s16".to_string()]; let sample_rates = vec![16000]; let filters = vec![AudioFilter::Format(AudioFormat { sample_rates, channel_layouts, sample_formats, })]; stream_descriptors.push(StreamDescriptor::new_audio(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_SUBTITLE => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } AVMediaType::AVMEDIA_TYPE_DATA => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } _ => info!("Skip stream #{}", stream_index), }; } Ok(stream_descriptors) } fn process_frames( &mut self, _job_result: JobResult, stream_index: usize, frames: &[ProcessFrame], ) -> Result { let mut counter_inside_batch = 0; log::debug!( "Entered process_batch with batch of size {}, and stream_index {}", frames.len(), stream_index ); for decoded_frame in frames { counter_inside_batch += 1; match decoded_frame { ProcessFrame::AudioVideo(_frame) => unsafe { log::debug!( "Processing frame {} of batch {}", counter_inside_batch, COUNTER_BATCH ); }, _ => break, } } let size = frames.len(); log::debug!("Batch size {}", size); unsafe { COUNTER_BATCH += 1 }; Ok(ProcessResult::new_json("")) } } let (internal_local_exchange, mut external_local_exchange) = LocalExchange::create(); let internal_local_exchange = Arc::new(Mutex::new(internal_local_exchange)); let worker = Worker {}; let worker_configuration = WorkerConfiguration::new("", &worker, "instance_id").unwrap(); let worker = Arc::new(Mutex::new(worker)); std::thread::spawn(move || { let mut processor = Processor::new(internal_local_exchange, worker_configuration); processor.disable_ctrl_c_handler(); assert!(processor.run(worker, Arc::new(Mutex::new(None))).is_ok()); }); // Check if the worker is created successfully assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerCreated(_))) ); let job = Job::new(include_str!("../jobs/batch_frames.json")).unwrap(); external_local_exchange .send_order(OrderMessage::InitProcess(job.clone())) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerInitialized(_))) ); external_local_exchange .send_order(OrderMessage::StartProcess(job)) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::WorkerStarted(JobResult { .. }))) ); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback( Feedback::Progression { .. } ))) ); external_local_exchange .send_order(OrderMessage::Status) .unwrap(); assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Feedback(Feedback::Status { .. }))) ); for _ in 0..3 { let response = external_local_exchange.next_response(); log::debug!("{:?}", response); assert_matches!( response, Ok(Some(ResponseMessage::Feedback( Feedback::Progression { .. } ))) ); } assert_matches!( external_local_exchange.next_response(), Ok(Some(ResponseMessage::Completed(_))) ); std::fs::remove_file(file_path).unwrap(); }