use std::iter::repeat_with;
use std::marker::PhantomData;
use std::sync::{Arc, Barrier};
use std::thread;
use std::thread::JoinHandle;
use std::time;
use vmcircbuffer::sync::Circular;
use vmcircbuffer::sync::Reader;
const MIN_ITEMS: usize = 16384;
struct VectorSource;
impl VectorSource {
#[allow(clippy::new_ret_no_self)]
pub fn new(
input: Vec,
) -> Source Option + Send + Sync + 'static, A>
where
A: Send + Sync + Clone + 'static,
{
let mut i = 0;
let n_samples = input.len();
Source::new(move |s: &mut [A]| -> Option {
if i < n_samples {
let len = std::cmp::min(s.len(), n_samples - i);
s[0..len].clone_from_slice(&input[i..i + len]);
i += len;
Some(len)
} else {
None
}
})
}
}
#[allow(clippy::type_complexity)]
struct Source Option + Send + Sync + 'static, A: Send + Sync + 'static>
{
f: Option,
_p: PhantomData,
}
impl Option + Send + Sync + 'static, A: Send + Sync> Source {
pub fn new(f: F) -> Source {
Source {
f: Some(f),
_p: PhantomData,
}
}
pub fn run(&mut self, barrier: Arc) -> (Reader, JoinHandle<()>) {
let mut w = Circular::with_capacity::(MIN_ITEMS).unwrap();
let r = w.add_reader();
let mut f = self.f.take().unwrap();
let handle = thread::spawn(move || {
barrier.wait();
loop {
let s = w.slice();
if let Some(n) = f(s) {
w.produce(n);
} else {
break;
}
}
});
(r, handle)
}
}
struct CopyBlock;
impl CopyBlock {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> Middle
where
A: Send + Sync + Clone + 'static,
{
Middle::new(|input: &[A], output: &mut [A]| output.clone_from_slice(input))
}
}
#[allow(clippy::type_complexity)]
struct Middle
where
F: FnMut(&[A], &mut [B]) + Send + Sync + 'static,
A: Send + Sync + 'static,
B: Send + Sync + 'static,
{
f: Option,
_p1: PhantomData,
_p2: PhantomData,
}
impl Middle
where
F: FnMut(&[A], &mut [B]) + Send + Sync + 'static,
A: Send + Sync + 'static,
B: Send + Sync + 'static,
{
pub fn new(f: F) -> Middle {
Middle {
f: Some(f),
_p1: PhantomData,
_p2: PhantomData,
}
}
pub fn run(
&mut self,
mut reader: Reader,
barrier: Arc,
) -> (Reader, JoinHandle<()>) {
let mut w = Circular::with_capacity::(MIN_ITEMS).unwrap();
let r = w.add_reader();
let mut f = self.f.take().unwrap();
let handle = thread::spawn(move || {
barrier.wait();
while let Some(input) = reader.slice() {
let output = w.slice();
let n = std::cmp::min(input.len(), output.len());
f(&input[0..n], &mut output[0..n]);
reader.consume(n);
w.produce(n);
}
});
(r, handle)
}
}
struct Sink {
items: Option>,
}
impl Sink {
pub fn new(capacity: usize) -> Sink {
Sink {
items: Some(Vec::with_capacity(capacity)),
}
}
pub fn run(&mut self, mut r: Reader, barrier: Arc) -> JoinHandle> {
let mut items = self.items.take().unwrap();
thread::spawn(move || {
barrier.wait();
while let Some(s) = r.slice() {
items.extend_from_slice(s);
let l = s.len();
r.consume(l);
}
items
})
}
}
fn main() {
let n_samples = 20_000_000;
let input: Vec = repeat_with(rand::random::).take(n_samples).collect();
let n_copy = 200;
let barrier = Arc::new(Barrier::new(n_copy + 3));
let mut src = VectorSource::new(input.clone());
let (mut reader, _) = src.run(Arc::clone(&barrier));
for _ in 0..n_copy {
let mut cpy = CopyBlock::new::();
let (a, _) = cpy.run(reader, Arc::clone(&barrier));
reader = a;
}
let mut snk = Sink::new(n_samples);
let handle = snk.run(reader, Arc::clone(&barrier));
let now = time::Instant::now();
barrier.wait();
let output = handle.join().unwrap();
let elapsed = now.elapsed();
assert_eq!(input, output);
println!("data matches");
println!("runtime (in s): {}", elapsed.as_secs_f64());
}