//! An example of how to build a three stage pipeline use std::fmt::{Display, Formatter}; use std::path::PathBuf; use anyhow::Error; use benchmark_rs::benchmarks::Benchmarks; use benchmark_rs::stopwatch::StopWatch; use rand::distributions::Alphanumeric; use rand::Rng; use simple_logger::SimpleLogger; use command_executor::command::Command; use command_executor::queue_type::QueueType; use command_executor::shutdown_mode::ShutdownMode; use command_executor::thread_pool::ThreadPool; use command_executor::thread_pool_builder::ThreadPoolBuilder; #[derive(Clone)] pub struct BenchmarkConfig { tasks: usize, queue_size: usize, queue_type: QueueType, commands: usize, string_length: usize, description: String, } impl BenchmarkConfig { pub fn new(tasks: usize, queue_size: usize, queue_type: QueueType, commands: usize, string_length: usize, description: &str) -> BenchmarkConfig { BenchmarkConfig { tasks, queue_size, queue_type, commands, string_length, description: description.to_string(), } } pub fn tasks(&self) -> usize { self.tasks } pub fn queue_size(&self) -> usize { self.queue_size } pub fn queue_type(&self) -> QueueType { self.queue_type } pub fn commands(&self) -> usize { self.commands } pub fn string_length(&self) -> usize { self.string_length } } impl Display for BenchmarkConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "tasks: {}, queue_size: {}, queue_type: {}, string_length: {}, description: {}", self.tasks, self.queue_size, self.queue_type, self.string_length, self.description, ) } } struct ProcessingCommand { count: usize, length: usize, } impl ProcessingCommand { fn new(count: usize, length: usize) -> ProcessingCommand { ProcessingCommand { count, length, } } } impl Command for ProcessingCommand { fn execute(&self) -> Result<(), Error> { let mut v: Vec = Vec::with_capacity(self.count); for _i in 0..self.count { v.push( rand::thread_rng() .sample_iter(&Alphanumeric) .take(self.length) .map(char::from) .collect::() ); } v.sort(); Ok(()) } } fn create_thread_pool(name: &str, tasks: usize, queue_size: usize, queue_type: QueueType) -> Result { Ok( ThreadPoolBuilder::new() .with_name_str(name) .with_tasks(tasks) .with_queue_type(queue_type) .with_queue_size(queue_size) .with_shutdown_mode(ShutdownMode::CompletePending) .build()? ) } fn shutdown(thread_pool: &mut ThreadPool) -> Result<(), anyhow::Error> { thread_pool.shutdown(); thread_pool.join() } #[test] fn queue_types_bench() -> Result<(), Error> { SimpleLogger::new().init().unwrap(); log::info!("Started increasing workloads benchmark."); for i in 0..5 { increasing_workloads(10_usize.pow(i), 4, 200, 25)? } log::info!("Finished increasing workloads benchmark."); Ok(()) } fn bench_name(base: &str, magnitude: usize) -> String { format!("{}-magnitude-{}", base, magnitude) } fn increasing_workloads(magnitude: usize, queue_size: usize, commands: usize, string_length: usize) -> Result<(), Error> { let mut benchmarks = Benchmarks::new(bench_name("blocking-queue-increasing-workloads", magnitude).as_str()); let work_points: Vec = (1..=20).into_iter().map(|i| i * magnitude).collect(); benchmarks.add( bench_name("blocking-queue-baseline", magnitude).as_str(), work0, BenchmarkConfig::new( 0, 0, QueueType::BlockingQueue, commands, string_length, "inline processing baseline", ), work_points.clone(), 3, 1, )?; benchmarks.add( bench_name("blocking-queue-1-tasks", magnitude).as_str(), work, BenchmarkConfig::new( 1, queue_size, QueueType::BlockingQueue, commands, string_length, "", ), work_points.clone(), 3, 1, )?; benchmarks.add( bench_name("crossbeam-blocking-queue-1-tasks", magnitude).as_str(), work, BenchmarkConfig::new( 1, queue_size, QueueType::CrossbeamBlockingQueue, commands, string_length, "", ), work_points.clone(), 3, 1, )?; benchmarks.add( bench_name("blocking-queue-2-tasks", magnitude).as_str(), work, BenchmarkConfig::new( 2, queue_size, QueueType::BlockingQueue, commands, string_length, "", ), work_points.clone(), 3, 1, )?; benchmarks.add( bench_name("crossbeam-blocking-queue-2-tasks", magnitude).as_str(), work, BenchmarkConfig::new( 2, queue_size, QueueType::CrossbeamBlockingQueue, commands, string_length, "", ), work_points.clone(), 3, 1, )?; benchmarks.add( bench_name("blocking-queue-4-tasks", magnitude).as_str(), work, BenchmarkConfig::new( 4, queue_size, QueueType::BlockingQueue, commands, string_length, "", ), work_points.clone(), 3, 1, )?; benchmarks.add( bench_name("crossbeam-blocking-queue-4-tasks", magnitude).as_str(), work, BenchmarkConfig::new( 4, queue_size, QueueType::CrossbeamBlockingQueue, commands, string_length, "", ), work_points.clone(), 3, 1, )?; benchmarks.run()?; benchmarks.save_to_csv(PathBuf::from("./target/benchmarks/"), true, true)?; benchmarks.save_to_json(PathBuf::from("./target/benchmarks/"))?; Ok(()) } fn work(_stop_watch: &mut StopWatch, config: BenchmarkConfig, work: usize) -> Result<(), anyhow::Error> { log::info!("start work: work: {}, commands: {}, tasks: {}, queue_size: {}", work, config.commands(), config.tasks(), config.queue_size()); let mut thread_pool = create_thread_pool( "trhead-pool", config.tasks(), config.queue_size(), config.queue_type(), )?; for _i in 0..config.commands() { thread_pool.submit(Box::new(ProcessingCommand::new(work, config.string_length()))); } shutdown(&mut thread_pool)?; log::info!("finish work"); Ok(()) } fn work0(_stop_watch: &mut StopWatch, config: BenchmarkConfig, work: usize) -> Result<(), Error> { log::info!("start work0: work: {}, commands: {}, tasks: {}", work, config.commands(), config.tasks()); for _i in 0..config.commands() { let mut v: Vec = Vec::with_capacity(work); for _i in 0..work { v.push( rand::thread_rng() .sample_iter(&Alphanumeric) .take(config.string_length()) .map(char::from) .collect::() ); } v.sort(); } log::info!("finish work0"); Ok(()) }