//! A simple example of parallel reading from file. //! Data is read by producer threads and sent to consumer threads which pass the //! data to a client callback for consumption. //! //! Input: //! //! * input file name //! * number of producer threads //! * number of consumer threads //! * number of chunks (== tasks) per producer //! * number of buffers per producer //! //! Usage: //! ```ignore //! cargo run --example example_parallel_read 12 4 3 2 //! ``` use par_io::read::read_file; pub fn main() { let filename = std::env::args().nth(1).expect("Missing file name"); let len = std::fs::metadata(&filename) .expect("Error reading file size") .len(); let num_producers: u64 = std::env::args() .nth(2) .expect("Missing num producers") .parse() .unwrap(); let num_consumers: u64 = std::env::args() .nth(3) .expect("Missing num consumers") .parse() .unwrap(); let chunks_per_producer: u64 = std::env::args() .nth(4) .expect("Missing num chunks per producer") .parse() .unwrap(); let num_buffers_per_producer: u64 = if let Some(p) = std::env::args().nth(5) { p.parse().expect("Wrong num tasks format") } else { 2 }; let consume = |buffer: &[u8], data: &String, chunk_id: u64, num_chunks: u64, _offset: u64| -> Result { std::thread::sleep(std::time::Duration::from_secs(1)); println!( "Consumer: {}/{} {} buffer length: {}", chunk_id, num_chunks, data, buffer.len() ); Ok(buffer.len()) }; let tag = "TAG".to_string(); match read_file( &filename, num_producers, num_consumers, chunks_per_producer, std::sync::Arc::new(consume), tag, num_buffers_per_producer, ) { Ok(v) => { let bytes_consumed = v .iter() .fold(0, |acc, x| if let (_, Ok(b)) = x { acc + b } else { acc }); assert_eq!(bytes_consumed, len as usize); } Err(err) => { use par_io::read::ReadError; match err { ReadError::IO(err) => { eprintln!("IO error: {:?}", err); } ReadError::Send(err) => { eprintln!("Send error: {:?}", err); } ReadError::Other(err) => { eprintln!("Error: {:?}", err); } } } } }