// This Source Code Form is subject to the terms of the Mozilla Public // License, v2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. use std::{ffi::OsString, num::NonZeroU8, process::Command}; use concurrent_interner::serial; use concurrent_interner::ConcurrentInterner; use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use crossbeam::thread::Scope; use std::hash::{BuildHasher, BuildHasherDefault}; use walkdir::WalkDir; fn go<'a, RS: Send + Sync + Clone + BuildHasher>( nthreads: u8, scope: &&Scope<'a>, interner: &'a ConcurrentInterner, path: &str, ) { let mut senders = vec![]; for _ in 0..nthreads { let (sender, receiver) = std::sync::mpsc::channel(); senders.push(sender); let _ = scope.spawn(move |_| { let mut m = interner.get_member(); loop { match receiver.recv() { Ok(Run::Continue(pb)) => { let buf = std::fs::read_to_string(pb).unwrap(); for s in buf.split_whitespace() { m.intern(s); } } Ok(Run::Stop) | Err(_) => break, } } }); } let mut rs = OsString::new(); rs.push("rs"); let mut idx = 0; for pb in WalkDir::new(path).into_iter().filter_map(|e| { let pb = e.ok()?.into_path(); if pb.extension() == Some(&rs) { Some(pb) } else { None } }) { senders[idx] .send(Run::Continue(pb)) .expect("failed to send continue message"); idx = (idx + 1) % (nthreads as usize); } for idx in 0..nthreads { senders[idx as usize] .send(Run::Stop) .expect("failed to send stop message"); } } fn do_stuff_multi(nthreads: u8, path: &str) { let interner = ConcurrentInterner::::new(NonZeroU8::new(nthreads).unwrap(), Default::default()); let _ = crossbeam::scope(|scope| { go(nthreads, &scope, &interner, path); }); } fn do_stuff(path: &str) { let arena = typed_arena::Arena::new(); let mut basic_interner = serial::Interner::::new(&arena); find_rs_files(path, &mut basic_interner); } fn warm_cache(path: &str) { for _ in 0..10 { do_stuff::(path); } } #[cfg(target_os = "macos")] #[allow(unused)] fn clear_cache() { Command::new("sudo") .arg("sync") .output() .expect("sync failed"); Command::new("sudo") .arg("purge") .output() .expect("purge failed"); } #[cfg(target_os = "linux")] #[allow(unused)] fn clear_cache() { Command::new("sudo") .arg("sync") .output() .expect("sync failed"); Command::new("sudo") .arg("sysctl") .arg("vm.drop_caches=1") .output() .expect("sysctl vm.drop_caches=1 failed"); } fn criterion_benchmark(c: &mut Criterion) { criterion_benchmark_generic::(c, "ahash"); criterion_benchmark_generic::>(c, "fxhash"); } fn criterion_benchmark_generic( c: &mut Criterion, hash_name: &'static str, ) { let benchmark_name = |n: u8, cache_state: &'static str| { format!( "intern {}: ({}/{}, n = {})", if n == 1 { "serial" } else { "parallel" }, cache_state, hash_name, n, ) }; let nthreads = match std::env::var("BENCH_NTHREADS") { Ok(s) => s .split_whitespace() .map(|x| x.parse::()) .collect::, _>>() .expect("BENCH_NTHREADS should be a whitespace-separated list of u8s"), Err(_) => (1..=8).collect(), }; let path = "./benchmarks/data"; let mut run_benchmark = |name| { for n in nthreads.iter() { c.bench_function(&benchmark_name(*n, name), |b| { b.iter_batched( if name == "warm" { || {} } else { clear_cache }, |_| { if *n == 1 { do_stuff::(path) } else { do_stuff_multi::(*n, path) } }, BatchSize::NumIterations(1), ); }); } }; match std::env::var("BENCH_COLD_CACHE") { Ok(_) => { clear_cache(); run_benchmark("cold"); } Err(_) => {} } warm_cache(path); run_benchmark("warm"); } criterion_group!(benches, criterion_benchmark); criterion_main!(benches); // Benchmark: // // A. Walk over nested directories with lots of files: // - For each file: -> move file to thread // -> keep tokenizing and adding to interner // // B. Walk over nested directories with lots of files: // - For each file: // -> keep tokenizing and add to simple interner // https://www.reddit.com/r/rust/comments/fn1jxf/blog_post_fast_and_simple_rust_interner/fl7xvkt?utm_source=share&utm_medium=web2x&context=3 // // Combinations: // (1) With warm disk buffer cache // (2) With cold disk buffer cache fn find_rs_files<'a, RS: Default + BuildHasher>( path: &str, interner: &mut serial::Interner<'a, RS>, ) { let mut rs = OsString::new(); rs.push("rs"); for pb in WalkDir::new(path).into_iter().filter_map(|e| { let pb = e.ok()?.into_path(); if pb.extension() == Some(&rs) { Some(pb) } else { None } }) { let buf = std::fs::read_to_string(&pb).unwrap(); for str in buf.split_whitespace() { interner.intern(str); } } } enum Run { Continue(T), Stop, }