use orx_concurrent_bag::ConcurrentBag; use orx_fixed_vec::FixedVec; use orx_pinned_vec::IntoConcurrentPinnedVec; use orx_split_vec::SplitVec; use test_case::test_matrix; #[test_matrix([ FixedVec::new(100), SplitVec::with_doubling_growth_and_fragments_capacity(16), SplitVec::with_linear_growth_and_fragments_capacity(10, 16) ], [ FixedVec::new(100), SplitVec::with_doubling_growth_and_fragments_capacity(16), SplitVec::with_linear_growth_and_fragments_capacity(10, 16) ])] fn concurrent_get_and_iter(pinned_i32: P, pinned_f32: Q) where P: IntoConcurrentPinnedVec + Clone, Q: IntoConcurrentPinnedVec, { // record measurements in (assume) random intervals let measurements: ConcurrentBag<_, _> = pinned_i32.clone().into(); let rf_measurements = &measurements; // collect sum of measurements every 50 milliseconds let sums: ConcurrentBag<_, _> = pinned_i32.into(); let rf_sums = &sums; // collect average of measurements every 50 milliseconds let averages: ConcurrentBag<_, _> = pinned_f32.into(); let rf_averages = &averages; std::thread::scope(|s| { s.spawn(move || { for i in 0..100 { std::thread::sleep(std::time::Duration::from_millis(i % 5)); rf_measurements.push(i as i32); } }); s.spawn(move || { for _ in 0..10 { let mut sum = 0; for i in 0..rf_measurements.len() { sum += unsafe { rf_measurements.get(i) }.copied().unwrap_or(0); } rf_sums.push(sum); std::thread::sleep(std::time::Duration::from_millis(50)); } }); s.spawn(move || { for _ in 0..10 { // better to have count & add as an atomic reduction for correctness // calling .len() and .iter().sum() separately might possibly give a false average fn count_and_add(len_and_sum: (usize, i32), x: &i32) -> (usize, i32) { (len_and_sum.0 + 1, len_and_sum.1 + x) } let (len, sum) = unsafe { rf_measurements.iter() }.fold((0, 0), count_and_add); let average = sum as f32 / len as f32; rf_averages.push(average); std::thread::sleep(std::time::Duration::from_millis(50)); } }); }); assert_eq!(measurements.len(), 100); assert_eq!(sums.len(), 10); assert_eq!(averages.len(), 10); }