extern crate timely; use timely::dataflow::{InputHandle, ProbeHandle}; use timely::dataflow::operators::{Input, Filter, Probe}; fn main() { let mut args = std::env::args(); args.next(); let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize"); let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize"); timely::execute_from_args(args, move |worker| { let index = worker.index(); let peers = worker.peers(); // re-synchronize all workers (account for start-up). timely::synchronization::Barrier::new(worker).wait(); let timer = std::time::Instant::now(); let mut input = InputHandle::new(); let mut probe = ProbeHandle::new(); // Create a dataflow that discards input data (just syncronizes). worker.dataflow(|scope| { scope .input_from(&mut input) // read input. .filter(|_| false) // do nothing. .probe_with(&mut probe); // observe output. }); let ns_per_request = 1_000_000_000 / rate; let mut insert_counter = index; // counts up as we insert records. let mut retire_counter = index; // counts up as we retire records. let mut inserted_ns = 0; // We repeatedly consult the elapsed time, and introduce any data now considered available. // At the same time, we observe the output and record which inputs are considered retired. let mut counts = vec![[0u64; 16]; 64]; let counter_limit = rate * duration_s; while retire_counter < counter_limit { // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`. let elapsed = timer.elapsed(); let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64); // Determine completed ns. let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]); // Notice any newly-retired records. while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit { let requested_at = (retire_counter * ns_per_request) as u64; let latency_ns = elapsed_ns - requested_at; let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize; let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF; counts[count_index][low_bits as usize] += 1; retire_counter += peers; } // Now, should we introduce more records before stepping the worker? // Three choices here: // // 1. Wait until previous batch acknowledged. // 2. Tick at most once every millisecond-ish. // 3. Geometrically increase outstanding batches. // Technique 1: // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns }; // Technique 2: // let target_ns = elapsed_ns & !((1 << 20) - 1); // Technique 3: let target_ns = { let delta: u64 = inserted_ns - acknowledged_ns; let bits = ::std::mem::size_of::() * 8 - delta.leading_zeros() as usize; let scale = ::std::cmp::max((1 << bits) / 4, 1024); elapsed_ns & !(scale - 1) }; // Common for each technique. if inserted_ns < target_ns { while ((insert_counter * ns_per_request) as u64) < target_ns { input.send(insert_counter); insert_counter += peers; } input.advance_to(target_ns); inserted_ns = target_ns; } worker.step(); } // Report observed latency measurements. if index == 0 { let mut results = Vec::new(); let total = counts.iter().map(|x| x.iter().sum::()).sum(); let mut sum = 0; for index in (10 .. counts.len()).rev() { for sub in (0 .. 16).rev() { if sum > 0 && sum < total { let latency = (1 << (index-1)) + (sub << (index-5)); let fraction = (sum as f64) / (total as f64); results.push((latency, fraction)); } sum += counts[index][sub]; } } for (latency, fraction) in results.drain(..).rev() { println!("{}\t{}", latency, fraction); } } }).unwrap(); }