/** Matrix * Vector multiplication A: n*n matrix b: n vector */ use ppl::prelude::*; use rand::Rng; // Taken a matrix, this source emit a row of the matrix at a time. struct Source { matrix: Vec>, } impl Out> for Source { fn run(&mut self) -> Option> { if !self.matrix.is_empty() { return Some(self.matrix.remove(0)); } None } } // Given a vector, this stage multiply it by the vector stored in its state. #[derive(Clone)] struct Multiplication { vec: Vec, } impl InOut, i64> for Multiplication { fn run(&mut self, input: Vec) -> Option { Some(mult(input, &self.vec)) } // This stage is ordered because the multiplication is done in order. fn is_ordered(&self) -> bool { true } fn number_of_replicas(&self) -> usize { 6 } } // This sink collect the results of the multiplication. // It is possible to use an OrderedSinkVec instead of this custom sink. struct Sink { res: Vec, } impl In> for Sink { fn run(&mut self, input: i64) { self.res.push(input); } fn is_ordered(&self) -> bool { true } fn finalize(self) -> Option> { Some(self.res) } } fn mult(m: Vec, v: &[i64]) -> i64 { let mut res = 0; for i in 0..m.len() { res += m[i] * v[i]; } res } #[test] fn test_mxv() { env_logger::init(); let n = 1000; //rows let mut matrix = vec![vec![0; n]; n]; let mut vec = vec![0; n]; let mut correct_result = vec![0; n]; for i in 0..n { vec[i] = rand::thread_rng().gen_range(0..10); for j in 0..n { matrix[i][j] = rand::thread_rng().gen_range(0..10); } } // Build and start the spp graph let mut p = pipeline![ Source { matrix: matrix.clone() }, Multiplication { vec: vec.clone() }, Sink { res: vec![] } ]; p.start(); // Collect the results of the computation let res = p.wait_end().unwrap(); // Compute the "reference" result for i in 0..matrix.len() { (0..vec.len()).for_each(|j| { correct_result[i] += matrix[i][j] * vec[j]; }); } assert_eq!(res, correct_result); }