use std::sync::{Arc, Mutex}; use std::time::SystemTime; use sync42::wait_list::WaitList; //////////////////////////////////////////// biometrics //////////////////////////////////////////// static HEAD_PASSBACK_INITIATED: biometrics::Counter = biometrics::Counter::new("playground.clicker.head_passback_initiated"); static HEAD_PASSBACK_FINISHED: biometrics::Counter = biometrics::Counter::new("playground.clicker.head_passback_finished"); static INCREMENT_OUTSTANDING_CALLS: biometrics::Counter = biometrics::Counter::new("playground.clicker.increment_outstanding_calls"); static DECREMENT_OUTSTANDING_CALLS: biometrics::Counter = biometrics::Counter::new("playground.clicker.decrement_outstanding_calls"); static BLOCKING_ON_TICKET_ACQUISITION: biometrics::Counter = biometrics::Counter::new("playground.clicker.blocking_on_ticket_acquisition"); static ABLE_TO_ACQUIRE_TICKETS: biometrics::Counter = biometrics::Counter::new("playground.clicker.able_to_acquire_tickets"); pub fn register_biometrics(collector: &mut biometrics::Collector) { sync42::register_biometrics(collector); collector.register_counter(&HEAD_PASSBACK_INITIATED); collector.register_counter(&HEAD_PASSBACK_FINISHED); collector.register_counter(&INCREMENT_OUTSTANDING_CALLS); collector.register_counter(&DECREMENT_OUTSTANDING_CALLS); collector.register_counter(&BLOCKING_ON_TICKET_ACQUISITION); collector.register_counter(&ABLE_TO_ACQUIRE_TICKETS); } ////////////////////////////////////////////// Counter ///////////////////////////////////////////// struct Counter { count: Mutex, } impl Counter { const fn new() -> Self { Self { count: Mutex::new(1), } } fn count(&self, amount: u64) -> u64 { let mut count = self.count.lock().unwrap(); let alloc: u64 = *count; *count = count.wrapping_add(amount); std::thread::sleep(std::time::Duration::from_millis(1_000)); alloc } } ////////////////////////////////////////////// Clicker ///////////////////////////////////////////// #[derive(Clone, Debug, Default)] enum WaitState { #[default] Present, CallingCount, Counted(u64), } struct ClickerState { outstanding_calls: u64, head_passback: bool, } impl ClickerState { const fn new() -> Self { Self { outstanding_calls: 0, head_passback: false, } } } struct Clicker<'a> { concurrent_count_calls: u64, counter: &'a Counter, state: Mutex, wait_list: WaitList, } impl<'a> Clicker<'a> { fn new(concurrent_count_calls: u64, counter: &'a Counter) -> Self { Self { concurrent_count_calls, counter, state: Mutex::new(ClickerState::new()), wait_list: WaitList::new(), } } fn click(&self) -> u64 { let mut waiter = self.wait_list.link(WaitState::Present); let tickets = { let mut state = self.state.lock().unwrap(); #[allow(clippy::nonminimal_bool)] 'conditions: while !(state.outstanding_calls < self.concurrent_count_calls) && !(waiter.is_head() && state.head_passback) { match waiter.load() { WaitState::Present => { BLOCKING_ON_TICKET_ACQUISITION.click(); state = waiter.naked_wait(state); } WaitState::CallingCount => { panic!("CallingCount state achieved before allowed to call count"); } WaitState::Counted(_) => { break 'conditions; } } if state.outstanding_calls < self.concurrent_count_calls && !state.head_passback { HEAD_PASSBACK_INITIATED.click(); state.head_passback = true; INCREMENT_OUTSTANDING_CALLS.click(); state.outstanding_calls += 1; self.wait_list.notify_head(); } } if let WaitState::Counted(x) = waiter.load() { self.wait_list.unlink(waiter); self.wait_list.notify_head(); return x; } ABLE_TO_ACQUIRE_TICKETS.click(); let mut count = 0; for mut w in waiter.iter() { match w.load() { WaitState::Present => { count += 1; } WaitState::CallingCount | WaitState::Counted(_) => { break; } }; } waiter.store(WaitState::CallingCount); if waiter.is_head() && state.head_passback { HEAD_PASSBACK_FINISHED.click(); state.head_passback = false; } else { INCREMENT_OUTSTANDING_CALLS.click(); state.outstanding_calls += 1; } count }; let ticket_run_start = self.counter.count(tickets); for (mut w, ticket) in std::iter::zip(waiter.iter(), ticket_run_start..ticket_run_start + tickets) { w.store(WaitState::Counted(ticket)) } if let WaitState::Counted(x) = waiter.load() { self.wait_list.unlink(waiter); { let mut state = self.state.lock().unwrap(); if !state.head_passback { HEAD_PASSBACK_INITIATED.click(); state.head_passback = true; } else { state.outstanding_calls -= 1; DECREMENT_OUTSTANDING_CALLS.click(); } } self.wait_list.notify_head(); return x; } panic!("We gave everyone except ourselves a ticket."); } } /////////////////////////////////////////////// main /////////////////////////////////////////////// fn main() { // Setup the environment. std::thread::spawn(|| { let mut collector = biometrics::Collector::new(); register_biometrics(&mut collector); let fout = std::fs::File::create("/dev/stdout").unwrap(); let mut emit = biometrics::PlainTextEmitter::new(fout); loop { let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("clock should never fail") .as_millis() .try_into() .expect("millis since epoch should fit u64"); if let Err(e) = collector.emit(&mut emit, now) { eprintln!("collector error: {}", e); } std::thread::sleep(std::time::Duration::from_millis(250)); } }); // Configure the experiment. const NUMBER_OF_THREADS: u64 = sync42::MAX_CONCURRENCY as u64 / 2; const CONCURRENT_COUNT: u64 = 1; // Setup the experiment. static COUNTER: Counter = Counter::new(); let clicker: Arc = Arc::new(Clicker::new(CONCURRENT_COUNT, &COUNTER)); // Spawn all threads. let mut threads = Vec::new(); for _ in 0..NUMBER_OF_THREADS { let c = Arc::clone(&clicker); threads.push(std::thread::spawn(move || loop { let ticket = c.click(); println!("{}", ticket); if ticket > 1_000_000 { break; } })); } // Join all threads. for thread in threads.into_iter() { thread.join().unwrap(); } }