#[macro_use] extern crate serde_derive; use mcai_worker_sdk::{default_rust_mcai_worker_description, prelude::*}; use std::{thread::sleep, time::Duration}; #[cfg(feature = "media")] use std::sync::{mpsc::Sender, Arc, Mutex}; #[derive(Debug, Deserialize, JsonSchema)] #[serde(deny_unknown_fields)] struct WorkerParameters { action: Option, #[allow(dead_code)] source_path: Option, #[allow(dead_code)] destination_path: Option, // Updatable parameter #[cfg(feature = "media")] updatable: Option, /// Option sleep time in milliseconds /// /// For not media, it will sleep until a stop is received /// /// For media it will be between each frame sleep: Option, } #[derive(Debug, Default)] struct WorkerContext { #[cfg(feature = "media")] result: Option>>>, #[cfg(feature = "media")] updatable: Option, #[cfg(feature = "media")] sleep: Option, } default_rust_mcai_worker_description!(); impl McaiWorker for WorkerContext { fn init(&mut self) -> Result<()> { Ok(()) } #[cfg(feature = "media")] fn init_process( &mut self, parameters: WorkerParameters, format_context: Arc>, result: Arc>>, ) -> Result> { self.result = Some(result); self.sleep = parameters.sleep; self.updatable = Some("basic".to_string()); let mut stream_descriptors = vec![]; let format_context = format_context.lock().unwrap(); for stream_index in 0..format_context.get_nb_streams() { let stream_type = format_context.get_stream_type(stream_index as isize); info!( "Handle stream #{} with type: {:?}", stream_index, stream_type ); match stream_type { AVMediaType::AVMEDIA_TYPE_VIDEO => { let filters = vec![VideoFilter::Resize(Scaling { width: Some(200), height: Some(70), })]; stream_descriptors.push(StreamDescriptor::new_video(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_AUDIO => { let channel_layouts = vec!["mono".to_string()]; let sample_formats = vec!["s16".to_string()]; let sample_rates = vec![16000]; let filters = vec![AudioFilter::Format(AudioFormat { sample_rates, channel_layouts, sample_formats, })]; stream_descriptors.push(StreamDescriptor::new_audio(stream_index as usize, filters)) } AVMediaType::AVMEDIA_TYPE_SUBTITLE => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } AVMediaType::AVMEDIA_TYPE_DATA => { stream_descriptors.push(StreamDescriptor::new_data(stream_index as usize)) } _ => info!("Skip stream #{}", stream_index), }; } Ok(stream_descriptors) } #[cfg(feature = "media")] fn process_frames( &mut self, job_result: JobResult, stream_index: usize, frames: &[ProcessFrame], ) -> Result { for decoded_frame in frames { match decoded_frame { ProcessFrame::AudioVideo(frame) => { unsafe { let width = (*frame.frame).width; let height = (*frame.frame).height; let sample_rate = (*frame.frame).sample_rate; let channels = (*frame.frame).channels; let nb_samples = (*frame.frame).nb_samples; if width != 0 && height != 0 { info!( target: &job_result.get_str_job_id(), "Stream {} - PTS: {}, image size: {}x{}", stream_index, frame.get_pts(), width, height ); } else { info!( target: &job_result.get_str_job_id(), "Stream {} - PTS: {}, sample_rate: {}Hz, channels: {}, nb_samples: {}", stream_index, frame.get_pts(), sample_rate, channels, nb_samples, ); } } if let Some(duration) = self.sleep { sleep(Duration::from_millis(duration)); } } _ => break, } } Ok(ProcessResult::new_json("")) } #[cfg(feature = "media")] fn update_process(&mut self, parameters: WorkerParameters) -> Result<()> { self.updatable = parameters.updatable; Ok(()) } #[cfg(feature = "media")] fn ending_process(&mut self) -> Result<()> { log::info!("Ending process"); Ok(()) } /// Not called when the "media" feature is enabled fn process( &self, channel: Option, parameters: WorkerParameters, job_result: JobResult, ) -> Result { if let Some(duration) = parameters.sleep { let total = 10_u16; for count in 0..total { if Self::is_current_job_stopped(&channel) { return Ok(job_result.with_status(JobStatus::Stopped)); } log::debug!("sleep more ({}/{})...", count + 1, total); sleep(Duration::from_millis(duration)); publish_job_progression( channel.clone(), job_result.get_job_id(), ((count + 1) * 100 / total) as u8, )?; } } else { publish_job_progression(channel.clone(), job_result.get_job_id(), 50)?; } match parameters.action { Some(action_label) => match action_label.as_str() { "completed" => { publish_job_progression(channel, job_result.get_job_id(), 100)?; Ok(job_result.with_status(JobStatus::Completed)) } action_label => { let result = job_result.with_message(&format!("Unknown action named {action_label}")); Err(MessageError::ProcessingError(result)) } }, None => { let result = job_result.with_message("Unspecified action parameter"); Err(MessageError::ProcessingError(result)) } } } } fn main() { let worker_context = WorkerContext::default(); start_worker(worker_context); }