use std::fmt::Display; use crate::hashmap::HashMap; use crate::stack::Stack; use corundum::default::*; use corundum::sync::VWeak; use regex::Regex; use corundum::may_crash; type P = Allocator; struct ConsumerData { buf: PString, local: HashMap, active: bool, private_lines: Stack, } pub struct Consumer { pattern: PString, data: PMutex, lines: Parc>>, } impl Consumer { pub fn new( pattern: &str, lines: Parc>>, j: &Journal, ) -> Self { Self { pattern: PString::from_str(pattern, j), lines, data: PMutex::new( ConsumerData { buf: PString::new(), private_lines: Stack::new(), local: HashMap::new(j), active: true, } ), } } /// Starts processing `lines` and updating `words` pub fn start(slf: VWeak, isolated: bool) { loop { // Read from global buffer to the local buffer if !P::transaction(|j| { may_crash!(); if let Some(slf) = slf.promote(j) { may_crash!(); let mut this = slf.data.lock(j); may_crash!(); if this.buf.is_empty() { may_crash!(); let mut rem = 0; may_crash!(); let line = if !isolated { may_crash!(); let mut lines = slf.lines.lock(j); may_crash!(); rem = lines.len(); may_crash!(); lines.pop(j) } else { may_crash!(); this.private_lines.pop(j) }; if unsafe { crate::PRINT } { if !isolated { eprint!( "\r\x1b[?25lRemaining: {:<12} Memory usage: {:<9} bytes \x1b[?25h", rem, P::used() ); } else { eprint!( "\r\x1b[?25lMemory usage: {:<9} bytes \x1b[?25h", P::used() ); } } if let Some(line) = line { may_crash!(); this.buf = line; may_crash!(); true // Still working } else { may_crash!(); this.active } } else { may_crash!(); true } } else { false } }).unwrap() { return; } // counting words P::transaction(|j| { if let Some(slf) = slf.promote(j) { may_crash!(); let mut this = slf.data.lock(j); may_crash!(); if !this.buf.is_empty() { may_crash!(); let buf = this.buf.to_string(); may_crash!(); let re = Regex::new(slf.pattern.as_str()).unwrap(); may_crash!(); for cap in re.captures_iter(&buf) { may_crash!(); let w = cap.get(1).unwrap().as_str().to_pstring(j); may_crash!(); this.local.update_with(&w, j, |v| v + 1); may_crash!(); } this.buf.clear(); may_crash!(); } } }).unwrap(); } } pub fn collect(&self, words: Parc>>, j: &Journal) { let mut this = self.data.lock(j); let mut words = words.lock(j); this.local.foreach(|k, v| { words.update_with(k, j, |v0| v0 + v); }); this.local.clear(j); } pub fn stop_when_finished(&self) { P::transaction(|j| { let mut this = self.data.lock(j); this.active = false; }).unwrap(); } pub fn activate(&self) { P::transaction(|j| { let mut this = self.data.lock(j); this.active = true; }).unwrap(); } pub fn take_one(&self, lines: &mut Stack, j: &Journal) -> bool { let mut this = self.data.lock(j); if let Some(line) = lines.pop(j) { this.private_lines.push(line, j); true } else { false } } pub fn private_buf_size(&self) -> usize { P::transaction(|j| { let this = self.data.lock(j); this.private_lines.len() }).unwrap() } } impl Display for Consumer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { let s = P::transaction(move |j| { let data = self.data.lock(j); format!("local:\n\x1b[0;31m{}\x1b[0m", data.local) }).unwrap(); writeln!(f, "{}", s)?; Ok(()) } }