#![allow(dead_code)] //! # Word Count example with MapReduce model mod consumer; mod hashmap; mod producer; mod stack; use consumer::Consumer; use hashmap::*; use corundum::default::*; use corundum::open_flags::*; use producer::Producer; use stack::*; use std::env; use std::fs::File; use std::io::{BufRead, BufReader}; use std::thread; pub static mut PRINT: bool = true; type P = Allocator; fn help() { println!("usage: grep [OPTIONS] list-file"); println!(); println!("OPTIONS:"); println!(" -p num Search pattern (Default '\\w+')"); println!(" -r num Number of reader (producer) threads (Default 1)"); println!(" -c num Number of consumer threads (Default 1)"); println!(" -f file Pool filename (Default ./wc.pool)"); println!(" -N Do not print output (perf test)"); println!(" -D Distribute text partitions to consumers (used with -I)"); println!(" -I Continue counting in isolation (used with -D)"); println!(" -C Continue from the previous run (used with -P)"); println!(" -P Prepare only (do not run threads, used with -C)"); println!(" -h Display help"); println!(); println!("The input list-file should contain a list files to read and count words."); } fn main() { let args: Vec = env::args().collect(); if args.len() < 2 { println!("usage: {} [OPTIONS] filename", args[0]); return; } let mut r = 1; let mut c = 1; let mut pool = "wc.pool".to_string(); let mut filename = std::string::String::new(); let mut i = 1; let mut cont = false; let mut prep = false; let mut dist = false; let mut isld = false; let mut pattern = "(\\w+)".to_string(); while i < args.len() { let s = &args[i]; if s == "-h" { help(); return; } else if s == "-p" { if i == args.len() - 1 { panic!("-p requires an argument"); } i += 1; pattern = format!("({})", args[i]); } else if s == "-r" { if i == args.len() - 1 { panic!("-r requires an argument"); } i += 1; r = args[i].parse().expect("An integer expected"); if r < 1 { panic!("Number of reader threads cannot be less than 1"); } } else if s == "-c" { if i == args.len() - 1 { panic!("-c requires an argument"); } i += 1; c = args[i].parse().expect("An integer expected"); if c < 0 { panic!("Number of consumer threads cannot be less than 0"); } } else if s == "-f" { if i == args.len() - 1 { panic!("-f requires an argument"); } i += 1; pool = args[i].clone(); } else if s == "-C" { cont = true; } else if s == "-N" { unsafe {PRINT = false;} } else if s == "-D" { dist = true; } else if s == "-I" { isld = true; cont = true; } else if s == "-P" { prep = true; } else if filename.is_empty() { filename = s.clone(); } else { panic!("Unknown option `{}'", s); } i += 1; } struct Root { lines: Parc>>, words: Parc>>, producers: PRefCell>>, consumers: PRefCell>>, } impl RootObj

for Root { fn init(j: &Journal) -> Self { Root { lines: Parc::new(PMutex::new(Stack::new()), j), words: Parc::new(PMutex::new(HashMap::new(j)), j), producers: PRefCell::new(PVec::new()), consumers: PRefCell::new(PVec::new()), } } } let root = P::open::(&pool, O_CFNE | O_1GB).unwrap(); P::transaction(|j| { let mut producers = root.producers.borrow_mut(j); let mut consumers = root.consumers.borrow_mut(j); if !cont { producers.clear(); consumers.clear(); root.lines.lock(j).clear(); root.words.lock(j).clear(j); let mut files = vec![]; let f = BufReader::new( File::open(&filename).expect(&format!("cannot open `{}`", &filename)), ); for line in f.lines() { files.push(line.unwrap()); } let p = r.min(files.len()); let b = files.len() / p; for i in 0..p + 1 { if i * b < files.len() { producers.push( Parc::new( Producer::new( files[i * b..files.len().min((i + 1) * b)].to_vec(), root.lines.pclone(j), j, ), j, ), j, ); } } for _ in 0..c.max(1) { consumers.push( Parc::new( Consumer::new(&pattern, root.lines.pclone(j), j), j, ), j, ); } } }).unwrap(); eprintln!( "Total remaining from previous run: {} ", P::transaction(|j| root.lines.lock(j).len()).unwrap() ); if !prep { let producers = root.producers.borrow(); let consumers = root.consumers.borrow(); let mut p_threads = vec![]; let mut c_threads = vec![]; for p in &*producers { let p = p.demote(); p_threads.push(thread::spawn(move || Producer::start(p))) } if c == 0 { for thread in p_threads { thread.join().unwrap() } for consumer in &*consumers { consumer.activate(); } if !dist { for c in &*consumers { let c = c.demote(); c_threads.push(thread::spawn(move || Consumer::start(c, isld))) } } } else { for consumer in &*consumers { consumer.activate(); } if !dist { for c in &*consumers { let c = c.demote(); c_threads.push(thread::spawn(move || Consumer::start(c, isld))) } } for thread in p_threads { thread.join().unwrap() } } if !dist { // Notifying consumers that there is no more feeds for consumer in &*consumers { consumer.stop_when_finished(); } } else { P::transaction(|j| { let mut lines = root.lines.lock(j); let mut work = true; while work { for consumer in &*consumers { work = consumer.take_one(&mut lines, j); } } }).unwrap(); } for thread in c_threads { thread.join().unwrap() } eprintln!(); if dist { let mut i=0; for c in &*consumers { i += 1; eprintln!("c#{}.private_buf_size = {}", i, c.private_buf_size()); } } // Display results if unsafe {PRINT} { P::transaction(|j| { for c in &*consumers { c.collect(root.words.pclone(j), j); } let words = root.words.lock(j); println!("{}", words); }).unwrap(); } } println!("Memory usage = {} bytes", P::used()); }