//! An example of how to build a three stage pipeline use std::cell::RefCell; use std::collections::HashSet; use std::fs::File; use std::io::{BufRead, BufReader, Write}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::sync::RwLock; use anyhow::{anyhow, Context, Error}; use command_executor::command::Command; use command_executor::shutdown_mode::ShutdownMode; use command_executor::thread_pool::ThreadPool; use command_executor::thread_pool_builder::ThreadPoolBuilder; thread_local! { pub static NEXT_THREAD_POOL: RefCell>>> = RefCell::new(None); pub static RESULT_FILE: RefCell> = RefCell::new(None); } static RESULT_FILE_PATH: &str = "./target/pipeline-example-result"; struct FirstStageCommand { payload: i32, } impl FirstStageCommand { fn new(payload: i32) -> FirstStageCommand { FirstStageCommand { payload, } } } impl Command for FirstStageCommand { fn execute(&self) -> Result<(), Error> { NEXT_THREAD_POOL.with( |tp| { tp.borrow().as_ref().unwrap().read().unwrap().submit( Box::new(SecondStageCommand::new(self.payload)) ); Ok(()) } ) } } struct SecondStageCommand { payload: i32, } impl SecondStageCommand { fn new(payload: i32) -> SecondStageCommand { SecondStageCommand { payload, } } } impl Command for SecondStageCommand { fn execute(&self) -> Result<(), Error> { NEXT_THREAD_POOL.with( |tp| { tp.borrow().as_ref().unwrap().read().unwrap().submit( Box::new(ThirdStageCommand::new(self.payload)) ); Ok(()) } ) } } struct ThirdStageCommand { payload: i32, } impl ThirdStageCommand { fn new(payload: i32) -> ThirdStageCommand { ThirdStageCommand { payload, } } } impl Command for ThirdStageCommand { fn execute(&self) -> Result<(), Error> { RESULT_FILE.with( |cell| { let mut file_opt = cell.borrow_mut(); match file_opt.as_mut() { None => { let mut f = File::create(PathBuf::from(RESULT_FILE_PATH)) .with_context(|| anyhow!("path: {}", RESULT_FILE_PATH))?; writeln!(f, "{}", self.payload) .with_context(|| anyhow!("path: {}", RESULT_FILE_PATH))?; file_opt.replace(f); Ok(()) } Some(f) => { writeln!(f, "{}", self.payload) .with_context(|| anyhow!("path: {}", RESULT_FILE_PATH)) } } } ) } } fn create_thread_pool(name: &str, tasks: usize) -> Result>, anyhow::Error> { Ok( Arc::new( RwLock::new( ThreadPoolBuilder::new() .with_name_str(name) .with_tasks(tasks) .with_shutdown_mode(ShutdownMode::CompletePending) .build()? ) ) ) } fn set_next(thread_pool: Arc>, next: Arc>) -> Result<(), anyhow::Error> { let tp = thread_pool .write() .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?; tp.set_thread_local(&NEXT_THREAD_POOL, Some(next.clone())); Ok(()) } fn shutdown(thread_pool: Arc>) -> Result<(), anyhow::Error> { let mut tp = thread_pool .write() .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?; tp.shutdown(); tp.join() } pub fn main() -> Result<(), anyhow::Error> { let first_stage = create_thread_pool("first", 2)?; let second_stage = create_thread_pool("second", 2)?; let third_stage = create_thread_pool("third", 1)?; set_next(first_stage.clone(), second_stage.clone())?; set_next(second_stage.clone(), third_stage.clone())?; let mut source_set = HashSet::new(); for i in 0..256 { source_set.insert(i); let tp = first_stage .write() .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?; tp.submit(Box::new(FirstStageCommand::new(i))) } shutdown(first_stage.clone())?; // First stage thread pool finished processing the last command shutdown(second_stage.clone())?; // Second stage thread pool finished processing the last command shutdown(third_stage.clone())?; // Third stage thread pool finished processing the last command let mut result_set = HashSet::new(); let reader = BufReader::new(File::open(PathBuf::from(RESULT_FILE_PATH))?); for line_result in reader.lines() { let line = line_result?; let i = i32::from_str(line.as_str())?; result_set.insert(i); } assert_eq!(source_set, result_set); std::fs::remove_file(PathBuf::from(RESULT_FILE_PATH))?; Ok(()) }