#![allow(dead_code)] #![feature(min_const_generics)] //! # Word Count example with MapReduce model mod consumer; mod hashmap; mod producer; mod stack; use consumer::Consumer; use hashmap::*; use crndm::default::*; use producer::Producer; use stack::*; use std::env; use std::fs::File; use std::io::{BufRead, BufReader}; use std::thread; type P = BuddyAlloc; fn help() { println!("usage: grep [OPTIONS] list-file"); println!(); println!("OPTIONS:"); println!(" -p num Search pattern (Default '\\w+')"); println!(" -r num Number of reader threads (Default 1)"); println!(" -c num Number of consumer threads (Default 1)"); println!(" -f file Pool filename (Default ./wc.pool)"); println!(" -C Continue from the previous run"); println!(" -P Prepare only (do not run threads)"); println!(" -h Display help"); println!(); println!("The input list-file should contain a list files to read and count words."); } fn main() { let args: Vec = env::args().collect(); if args.len() < 2 { println!("usage: {} [OPTIONS] filename", args[0]); return; } let mut r = 1; let mut c = 1; let mut pool = "wc.pool".to_string(); let mut filename = std::string::String::new(); let mut i = 1; let mut cont = false; let mut prep = false; let mut pattern = "(\\w+)".to_string(); while i < args.len() { let s = &args[i]; if s == "-h" { help(); return; } else if s == "-p" { if i == args.len() - 1 { panic!("-p requires an argument"); } i += 1; pattern = format!("({})", args[i]); } else if s == "-r" { if i == args.len() - 1 { panic!("-r requires an argument"); } i += 1; r = args[i].parse().expect("An integer expected"); if r < 1 { panic!("Number of reader threads cannot be less than 1"); } } else if s == "-c" { if i == args.len() - 1 { panic!("-c requires an argument"); } i += 1; c = args[i].parse().expect("An integer expected"); if c < 1 { panic!("Number of consumer threads cannot be less than 1"); } } else if s == "-f" { if i == args.len() - 1 { panic!("-f requires an argument"); } i += 1; pool = args[i].clone(); } else if s == "-C" { cont = true; } else if s == "-P" { prep = true; } else if filename.is_empty() { filename = s.clone(); } else { panic!(format!("Unknown option `{}'", s)); } i += 1; } struct Root { lines: Parc>>, words: Parc>>, producers: PRefCell>>, consumers: PRefCell>>, } impl RootObj

for Root { fn init(j: &Journal) -> Self { Root { lines: Parc::new(PMutex::new(Stack::new(), j), j), words: Parc::new(PMutex::new(HashMap::new(j), j), j), producers: PRefCell::new(PVec::new(j), j), consumers: PRefCell::new(PVec::new(j), j), } } } let root = P::open::(&pool, O_CFNE | O_1GB).unwrap(); P::transaction(|j| { let mut producers = root.producers.borrow_mut(j); let mut consumers = root.consumers.borrow_mut(j); if !cont { producers.clear(); consumers.clear(); root.lines.lock(j).clear(); root.words.lock(j).clear(j); let mut files = vec![]; let f = BufReader::new( File::open(&filename).expect(&format!("cannot open `{}`", &filename)), ); for line in f.lines() { files.push(line.unwrap()); } let p = usize::min(r, files.len()); let b = files.len() / p; for i in 0..p + 1 { if i * b < files.len() { producers.push( Parc::new( Producer::new( files[i * b..usize::min(files.len(), (i + 1) * b)].to_vec(), root.lines.pclone(j), j, ), j, ), j, ); } } for _ in 0..c { consumers.push( Parc::new( Consumer::new(&pattern, root.lines.pclone(j), root.words.pclone(j), j), j, ), j, ); } } }) .unwrap(); eprintln!( "Total remaining from previous run: {} ", P::transaction(|j| root.lines.lock(j).len()).unwrap() ); if !prep { let producers = root.producers.borrow(); let consumers = root.consumers.borrow(); let mut p_threads = vec![]; let mut c_threads = vec![]; for i in 0..producers.len() { let producer = producers[i].volatile(); p_threads.push(thread::spawn(move || Producer::start(producer))) } for i in 0..consumers.len() { let consumer = consumers[i].volatile(); c_threads.push(thread::spawn(move || Consumer::start(consumer))) } for thread in p_threads { thread.join().unwrap() } // Notifying consumers that there is no more feeds let consumers = root.consumers.borrow(); for consumer in &*consumers { consumer.stop_when_finished(); } for thread in c_threads { thread.join().unwrap() } // Display results P::transaction(|j| { let words = root.words.lock(j); let mut vec = vec![]; words.foreach(|word, freq| { vec.push((word.to_string(), freq.clone())); }); vec.sort_by(|x, y| x.0.cmp(&y.0)); for (word, freq) in vec { println!("{:>32}: {}", word, freq); } }) .unwrap(); } println!("Memory usage = {} bytes", P::used()); }