//! An example of how to build a three stage pipeline use std::cell::RefCell; use std::fs::File; use std::io::{BufRead, BufReader, Write}; use std::io::Read; use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; use anyhow::{anyhow, Context, Error}; use hex; use sha1::{Digest, Sha1}; use command_executor::command::Command; use command_executor::shutdown_mode::ShutdownMode; use command_executor::thread_pool::ThreadPool; use command_executor::thread_pool_builder::ThreadPoolBuilder; thread_local! { pub static NEXT_THREAD_POOL: RefCell>>> = RefCell::new(None); pub static RESULT_FILE: RefCell> = RefCell::new(None); } static RESULT_FILE_PATH: &str = "./target/read-process-write-example-result"; static SOURCE_FILE_PATH: &str = "./target/10-million-password-list-top-1000000.txt"; struct ProcessingCommand { payload: String, } impl ProcessingCommand { fn new(payload: String) -> ProcessingCommand { ProcessingCommand { payload, } } } impl Command for ProcessingCommand { fn execute(&self) -> Result<(), Error> { NEXT_THREAD_POOL.with( |tp| { let mut hasher = Sha1::new(); hasher.update(&self.payload); let result = hasher.finalize(); let hash = hex::encode(result); tp.borrow().as_ref().unwrap().read().unwrap().submit( Box::new(WritingCommand::new(self.payload.clone(), hash))); Ok(()) } ) } } struct WritingCommand { payload: String, hash: String, } impl WritingCommand { fn new(payload: String, hash: String) -> WritingCommand { WritingCommand { payload, hash, } } } impl Command for WritingCommand { fn execute(&self) -> Result<(), Error> { RESULT_FILE.with( |cell| { let mut file_opt = cell.borrow_mut(); match file_opt.as_mut() { None => { let mut f = File::create(PathBuf::from(RESULT_FILE_PATH)) .with_context(|| anyhow!("path: {}", RESULT_FILE_PATH))?; writeln!(f, "{}\t{}", self.hash, self.payload) .with_context(|| anyhow!("path: {}", RESULT_FILE_PATH))?; file_opt.replace(f); Ok(()) } Some(f) => { writeln!(f, "{}\t{}", self.hash, self.payload) .with_context(|| anyhow!("path: {}", RESULT_FILE_PATH)) } } } ) } } fn create_thread_pool(name: &str, tasks: usize, queue_size: usize) -> Result>, anyhow::Error> { Ok( Arc::new( RwLock::new( ThreadPoolBuilder::new() .with_name_str(name) .with_tasks(tasks) .with_queue_size(queue_size) .with_shutdown_mode(ShutdownMode::CompletePending) .build()? ) ) ) } fn set_next(thread_pool: Arc>, next: Arc>) -> Result<(), anyhow::Error> { let tp = thread_pool .write() .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?; tp.set_thread_local(&NEXT_THREAD_POOL, Some(next.clone())); Ok(()) } fn shutdown(thread_pool: Arc>) -> Result<(), anyhow::Error> { let mut tp = thread_pool .write() .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?; tp.shutdown(); tp.join() } fn fetch_file(url: &str, output: PathBuf) -> Result<(), Error> { if !output.exists() { println!("Downloading file: {} -> {:?}", url, output); let mut response = reqwest::blocking::get(url.clone()) .with_context(|| anyhow!("Failed to download the file from {:?}", url) )?; let mut body = Vec::new(); response.read_to_end(&mut body) .with_context(|| anyhow!("Failed to read the file from {:?}", url) )?; let mut file = File::create(&output) .with_context(|| anyhow!("Failed to create the file: {:?}", output) )?; file.write(body.as_slice()) .with_context(|| anyhow!("failed to write content to {:?}", output) )?; } else { println!("File exists at {:?}, skipping download", output); } Ok(()) } fn read(processing_stage: Arc>) -> Result<(), anyhow::Error> { fetch_file( "https://github.com/danielmiessler/SecLists/raw/master/Passwords/Common-Credentials/10-million-password-list-top-1000000.txt", PathBuf::from(SOURCE_FILE_PATH), )?; let f = File::open(PathBuf::from(SOURCE_FILE_PATH)) .with_context(|| anyhow!("{}", SOURCE_FILE_PATH))?; let reader = BufReader::new(f); let tp = processing_stage .read() .map_err(|e| anyhow!("failed to lock tread pool: {e}"))?; for line_result in reader.lines() { let line = line_result?; tp.submit(Box::new(ProcessingCommand::new(line))) } Ok(()) } pub fn main() -> Result<(), anyhow::Error> { let processing_stage = create_thread_pool("processing", 8, 1000)?; let writing_stage = create_thread_pool("writing", 1, 1000)?; set_next(processing_stage.clone(), writing_stage.clone())?; read(processing_stage.clone())?; shutdown(processing_stage.clone())?; shutdown(writing_stage.clone())?; Ok(()) }