/* A pipeline of farm. There are generated multiple sequence of i from 1 to 20. The first stage is a farm that return simply the i-th number received. The second stage is a farm that compute the i-th Fibonacci number. The third stage is a farm that subtract 1 to the i-th Fibonacci number. */ use ppl::prelude::*; // Fibonacci function pub fn fib(n: usize) -> usize { match n { 0 | 1 => 1, _ => fib(n - 2) + fib(n - 1), } } // Source of the farm, it generates multiple sequence of i from 1 to 20. struct Source { streamlen: usize, } impl Out for Source { fn run(&mut self) -> Option { let mut ret = None; if self.streamlen > 0 { ret = Some((self.streamlen % 20) + 1); self.streamlen -= 1; } ret } } // Stage that return simply the i-th number received. #[derive(Clone)] struct WorkerA {} impl InOut for WorkerA { fn run(&mut self, input: usize) -> Option { Some(input) } fn number_of_replicas(&self) -> usize { 2 } } // Stage that computes the i-th Fibonacci number. #[derive(Clone)] struct WorkerB {} impl InOut for WorkerB { fn run(&mut self, input: usize) -> Option { Some(fib(input)) } fn number_of_replicas(&self) -> usize { 8 } } // Stage that subtract 1 to the i-th Fibonacci number. #[derive(Clone)] struct WorkerC {} impl InOut for WorkerC { fn run(&mut self, input: usize) -> Option { Some(input - 1) } fn number_of_replicas(&self) -> usize { 2 } } // Sink of the farm, it prints the i-th Fibonacci number. struct Sink { counter: usize, } impl In for Sink { fn run(&mut self, input: usize) { println!("{}", input); self.counter += 1; } fn finalize(self) -> Option { println!("End"); Some(self.counter) } } #[test] fn test_long_farm() { env_logger::init(); // Create a pipeline of farm. let mut p = pipeline!( Source { streamlen: 500000 }, WorkerA {}, WorkerB {}, WorkerC {}, Sink { counter: 0 } ); p.start(); let res = p.wait_end(); assert_eq!(res.unwrap(), 500000); }