//! Machinery for starting processing chains. use csv::Reader; use csv::{ByteRecord, ByteRecordsIntoIter}; use encoding::{DecoderTrap, EncodingRef, all::UTF_8}; use std::clone::Clone; use std::collections::VecDeque; use std::fs::File; use std::path::{Path, PathBuf}; use crate::{ Row, Headers, RowStream, error::{self, Error, RowResult}, }; fn decode(data: ByteRecord, encoding: EncodingRef) -> Row { let mut row = Row::with_capacity(data.as_slice().len(), data.len()); for item in data.iter() { row.push_field(&encoding.decode(item, DecoderTrap::Replace).unwrap()); } row } /// Represents a file as source of CSV data. /// /// Most of the time you won't need to deal with this struct, except when you /// need to customize the CSV options. pub struct ReaderSource { reader: Reader, path: PathBuf, } /// A source of data. Wraps a `csv::Reader`. /// /// You might not ever need to build one of this objects yourself since most of /// the time they're built for you inside `InputStream` constructors. impl ReaderSource { /// Build a ReaderSource from a `csv::Reader` and a path. /// /// the `path` is added to every record as a virtual column with name /// `_source`. pub fn from_reader>(reader: Reader, path: P) -> ReaderSource { ReaderSource { reader, path: path.as_ref().to_path_buf(), } } /// Build a `ReaderSource` from a path. /// /// If the file doesn't exist or it is otherwise inaccesible returns an /// error. pub fn from_path>(path: P) -> Result { csv::Reader::from_path(&path) .map(|reader| ReaderSource::from_reader(reader, path)) .map_err(|err| Error::Csv(err.to_string())) } fn headers(&mut self) -> ByteRecord { self.reader.byte_headers().unwrap().clone() } } impl From> for ReaderSource { fn from(reader: Reader) -> ReaderSource { ReaderSource { reader, // no path info, so here we improvise path: "From>".into(), } } } pub struct InputStreamBuilder { readers: Vec, encoding: EncodingRef, source_col: Option, } impl InputStreamBuilder { /// Start a transformation chain from a set of csv creaders. /// /// This is what you need in case your CSVs are not comma separated or you /// need to adjust some parameters like escape character. /// /// # Example /// /// ``` /// use csvsc::prelude::*; /// use csv::ReaderBuilder; /// use encoding::all::UTF_8; /// /// let mut chain = InputStreamBuilder::from_readers(vec![ /// ReaderBuilder::new() /// .delimiter(b';') /// .from_path("test/assets/1.csv") /// .unwrap().into(), /// ReaderBuilder::new() /// .delimiter(b';') /// .from_path("test/assets/2.csv") /// .unwrap().into(), /// ]).build().unwrap(); /// ``` pub fn from_readers(readers: I) -> InputStreamBuilder where I: IntoIterator, { InputStreamBuilder { readers: readers.into_iter().collect(), encoding: UTF_8, source_col: None, } } /// Start a transformation chain from a set of paths in the filesystem. /// /// This is probably the easiest way to get started. It assumes comma /// separated fields, double quotes as field wrap and some other sensible /// defaults from [csv /// crate](https://docs.rs/csv/1.1.5/csv/struct.ReaderBuilder.html) /// /// # Example /// /// ``` /// use csvsc::prelude::*; /// /// let mut chain = InputStreamBuilder::from_paths(&[ /// "test/assets/chicken_north.csv", /// "test/assets/chicken_south.csv", /// ]).unwrap().build().unwrap(); /// ``` /// /// From there you can chain the methods of the [`RowStream`](./trait.RowStream.html) /// trait. pub fn from_paths(paths: I) -> error::Result where I: IntoIterator, P: AsRef, { let paths: Vec<_> = paths.into_iter().collect(); let mut readers = Vec::with_capacity(paths.len()); for path in paths { readers.push(ReaderSource::from_path(path)?); } Ok(InputStreamBuilder { readers, encoding: UTF_8, source_col: None, }) } /// Use this encoding to decode the input files /// /// # Example /// /// ``` /// use csvsc::prelude::*; /// use encoding::all::WINDOWS_1252; /// /// let mut chain = InputStreamBuilder::from_paths(&[ /// "test/assets/windows1252/data.csv", /// ]).unwrap().with_encoding(WINDOWS_1252).build().unwrap(); /// ``` pub fn with_encoding(self, encoding: EncodingRef) -> Self { InputStreamBuilder { readers: self.readers, encoding, source_col: self.source_col, } } /// Add a column with this name to every row with the path to the file where /// it comes from. /// /// # Example /// /// ``` /// use csvsc::prelude::*; /// /// let mut chain = InputStreamBuilder::from_paths(&[ /// "test/assets/1.csv", /// ]).unwrap().with_source_col("source").build().unwrap().into_iter(); /// /// assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3", "test/assets/1.csv"])); /// ``` pub fn with_source_col(self, colname: &str) -> Self { InputStreamBuilder { readers: self.readers, encoding: self.encoding, source_col: Some(colname.into()), } } /// Build the [`InputStream`] from the parameters of this builder and return /// it. pub fn build(self) -> error::Result { let mut iter = self.readers.into_iter(); let mut input_stream = if let Some(first) = iter.next() { InputStream::new(first, self.encoding, self.source_col)? } else { return Err(Error::NoSources); }; for item in iter { input_stream.add_source(item); } Ok(input_stream) } } /// A structure for creating a transformation chain from input files. /// /// Most of the time you'll start your processing chain using `.from_readers()` /// or `.from_paths()` and then chain the methods of the /// [`RowStream`](./trait.RowStream.html) trait. However there are more options /// for the cases where you need to customize how your input is read. pub struct InputStream { readers: VecDeque, current_records: ByteRecordsIntoIter, current_path: PathBuf, encoding: EncodingRef, headers: Headers, source_col: Option, } impl InputStream { fn new(mut reader: ReaderSource, encoding: EncodingRef, source_col: Option) -> error::Result { let mut header_row = reader.reader.byte_headers()?.clone(); if let Some(col) = source_col.as_ref() { header_row.push_field(col.as_bytes()); } Ok(InputStream { readers: VecDeque::new(), headers: Headers::from(decode(header_row, encoding)), source_col, current_records: reader.reader.into_byte_records(), current_path: reader.path, encoding, }) } fn add_source(&mut self, item: ReaderSource) { self.readers.push_back(item); } } impl IntoIterator for InputStream { type Item = RowResult; type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { IntoIter { readers: self.readers, headers: self.headers, source_col: self.source_col, current_records: self.current_records, current_path: self.current_path, encoding: self.encoding, } } } impl RowStream for InputStream { fn headers(&self) -> &Headers { &self.headers } } pub struct IntoIter { current_records: ByteRecordsIntoIter, source_col: Option, encoding: EncodingRef, current_path: PathBuf, headers: Headers, readers: VecDeque, } impl Iterator for IntoIter { type Item = RowResult; fn next(&mut self) -> Option { match self.current_records.next() { Some(Ok(reg)) => { let mut str_reg = decode(reg, self.encoding); if self.source_col.is_some() { str_reg.push_field(&self.current_path.to_string_lossy()); } if str_reg.len() != self.headers.len() { return Some(Err(Error::InconsistentSizeOfRows( self.current_path.clone(), ))); } Some(Ok(str_reg)) } Some(Err(e)) => Some(Err(Error::Csv(format!("{:?}", e)))), None => match self.readers.pop_front() { Some(mut rs) => { let mut new_headers = decode(rs.headers(), self.encoding); if let Some(col) = self.source_col.as_ref() { new_headers.push_field(col); } if new_headers != self.headers { return Some(Err(Error::InconsistentHeaders)); } self.current_records = rs.reader.into_byte_records(); self.current_path = rs.path; self.next() } None => None, }, } } } #[cfg(test)] mod tests { use super::{InputStreamBuilder, ReaderSource, Row, RowStream}; use crate::error::Error; use encoding::all::WINDOWS_1252; #[test] fn test_builder_from_paths() { let mut chain = InputStreamBuilder::from_paths(vec![ "test/assets/1.csv", "test/assets/2.csv", ]).unwrap().build().unwrap().into_iter(); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["5", "2"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["2", "2"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["4", "3"])); assert!(chain.next().is_none()); } #[test] fn test_builder_from_readers() { let mut chain = InputStreamBuilder::from_readers(vec![ ReaderSource::from_path("test/assets/1.csv").unwrap(), ReaderSource::from_path("test/assets/2.csv").unwrap(), ]).build().unwrap().into_iter(); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["5", "2"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["2", "2"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["4", "3"])); assert!(chain.next().is_none()); } #[test] fn test_builder_with_encoding() { let chain = InputStreamBuilder::from_paths(vec![ "test/assets/windows1252/data.csv", ]).unwrap().with_encoding(WINDOWS_1252).build().unwrap(); assert_eq!(chain.headers(), &Row::from(vec!["name"]).into()); let mut chain = chain.into_iter(); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["árbol"])); assert!(chain.next().is_none()); } #[test] fn test_builder_with_source_col() { let chain = InputStreamBuilder::from_paths(vec![ "test/assets/1.csv", "test/assets/2.csv", ]).unwrap() .with_source_col("source") .build().unwrap(); assert_eq!(chain.headers(), &Row::from(vec!["a", "b", "source"]).into()); let mut chain = chain.into_iter(); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["1", "3", "test/assets/1.csv"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["5", "2", "test/assets/1.csv"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["2", "2", "test/assets/2.csv"])); assert_eq!(chain.next().unwrap().unwrap(), Row::from(vec!["4", "3", "test/assets/2.csv"])); assert!(chain.next().is_none()); } #[test] fn test_read_concatenated() { let input_stream = InputStreamBuilder::from_paths( &["test/assets/1.csv", "test/assets/2.csv"] ).unwrap().build().unwrap(); assert_eq!( *input_stream.headers().as_row(), Row::from(vec!["a", "b"]) ); let mut input_stream = input_stream.into_iter(); assert_eq!( input_stream.next().unwrap().unwrap(), Row::from(vec!["1", "3"]) ); assert_eq!( input_stream.next().unwrap().unwrap(), Row::from(vec!["5", "2"]) ); assert_eq!( input_stream.next().unwrap().unwrap(), Row::from(vec!["2", "2"]) ); assert_eq!( input_stream.next().unwrap().unwrap(), Row::from(vec!["4", "3"]) ); } #[test] fn detects_inconsistent_headers() { let input_stream = InputStreamBuilder::from_paths( &["test/assets/1.csv", "test/assets/3.csv"] ).unwrap().build().unwrap(); let mut input_stream = input_stream.into_iter(); match input_stream.nth(2) { Some(Err(Error::InconsistentHeaders)) => { } x => unreachable!("{:?}", x), } } }