#[cfg(test)] mod integration_test { use csv_diff::csv::Csv; #[cfg(not(feature = "rayon-threads"))] use csv_diff::csv_hash_task_spawner::{ CsvHashTaskSpawnerBuilderStdThreads, CsvHashTaskSpawnerStdThreads, }; #[cfg(not(feature = "rayon-threads"))] use csv_diff::diff_result::DiffByteRecords; use csv_diff::diff_row::{ByteRecordLineInfo, DiffByteRecord}; use pretty_assertions::assert_eq; #[cfg(not(feature = "rayon-threads"))] use std::convert::TryInto; use std::{error::Error, io::Cursor}; #[cfg(feature = "rayon-threads")] #[test] fn create_default_instance_and_diff_with_cursor() -> Result<(), Box> { let csv_diff = csv_diff::csv_diff::CsvByteDiffLocal::new()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected.as_slice()); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn local_create_default_instance_and_diff_without_cursor() -> Result<(), Box> { let csv_diff = csv_diff::csv_diff::CsvByteDiffLocal::new()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_diff.diff( Csv::with_reader_seek(csv_left.as_bytes()), Csv::with_reader_seek(csv_right.as_bytes()), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected.as_slice()); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn streaming_create_default_instance_and_diff_without_cursor() -> Result<(), Box> { let csv_diff = csv_diff::csv_diff::CsvByteDiff::new()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let diff_res = csv_diff.diff( Csv::with_reader(csv_left.as_bytes()), Csv::with_reader(csv_right.as_bytes()), ); let diff_rows_actual: Vec = diff_res.collect::>>()?; let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn local_create_instance_with_builder_and_diff_with_cursor() -> Result<(), Box> { let thread_pool = rayon::ThreadPoolBuilder::new().build()?; let csv_diff = csv_diff::csv_diff::CsvByteDiffLocalBuilder::new() .rayon_thread_pool(&thread_pool) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected.as_slice()); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn local_create_instance_with_builder_and_diff_without_cursor() -> Result<(), Box> { let thread_pool = rayon::ThreadPoolBuilder::new().build()?; let csv_diff = csv_diff::csv_diff::CsvByteDiffLocalBuilder::new() .rayon_thread_pool(&thread_pool) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_diff.diff( Csv::with_reader_seek(csv_left.as_bytes()), Csv::with_reader_seek(csv_right.as_bytes()), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected.as_slice()); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn streaming_create_instance_with_builder_and_diff_without_cursor() -> Result<(), Box> { use std::sync::Arc; let thread_pool = rayon::ThreadPoolBuilder::new().build()?; let csv_diff = csv_diff::csv_diff::CsvByteDiffBuilder::new() .rayon_thread_pool(Arc::new(thread_pool)) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let diff_res = csv_diff.diff( Csv::with_reader(csv_left.as_bytes()), Csv::with_reader(csv_right.as_bytes()), ); let diff_rows_actual = diff_res.collect::>>()?; let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn local_create_instance_with_builder_set_all_and_diff() -> Result<(), Box> { let thread_pool = rayon::ThreadPoolBuilder::new().build()?; let csv_diff = csv_diff::csv_diff::CsvByteDiffLocalBuilder::new() .rayon_thread_pool(&thread_pool) .primary_key_columns(std::iter::once(0)) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected.as_slice()); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn streaming_create_instance_with_builder_set_all_and_diff() -> Result<(), Box> { use std::sync::Arc; let thread_pool = rayon::ThreadPoolBuilder::new().build()?; let csv_diff = csv_diff::csv_diff::CsvByteDiffBuilder::new() .rayon_thread_pool(Arc::new(thread_pool)) .primary_key_columns(std::iter::once(0)) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let diff_res = csv_diff.diff( Csv::with_reader(csv_left.as_bytes()), Csv::with_reader(csv_right.as_bytes()), ); let diff_rows_actual = diff_res.collect::>>()?; let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn local_diffbyterecords_sort_by_columns() -> Result<(), Box> { use csv_diff::diff_result::ColumnIdx; let thread_pool = rayon::ThreadPoolBuilder::new().build()?; let csv_diff = csv_diff::csv_diff::CsvByteDiffLocalBuilder::new() .rayon_thread_pool(&thread_pool) .build()?; let csv_left = "\ header1,header2,header3\n\ a,_,c\n\ c,_,_"; let csv_right = "\ header1,header2,header3\n\ a,_,d\n\ b,_,_"; let mut diff_res = csv_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), )?; diff_res.sort_by_columns(vec![ColumnIdx::IdxForBoth(0)])?; let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![ DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "_", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "_", "d"]), 2), field_indices: vec![2], }, DiffByteRecord::Add(ByteRecordLineInfo::new( csv::ByteRecord::from(vec!["b", "_", "_"]), 3, )), DiffByteRecord::Delete(ByteRecordLineInfo::new( csv::ByteRecord::from(vec!["c", "_", "_"]), 3, )), ]; assert_eq!(diff_rows_actual, diff_rows_expected.as_slice()); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn local_diff_result_should_have_max_columns_and_headers() -> Result<(), Box> { let csv_diff = csv_diff::csv_diff::CsvByteDiffLocal::new()?; let csv_left = "\ header1,header2,header3\n\ a,_,c\n\ c,_,_"; let csv_right = "\ header1,header2,header3\n\ a,_,d\n\ b,_,_"; let diff_res = csv_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), )?; let num_cols_actual = diff_res.num_columns(); let num_cols_expected = Some(3); assert_eq!(num_cols_actual, num_cols_expected); let headers_actual = ( diff_res.headers().headers_left(), diff_res.headers().headers_right(), ); let r: csv::ByteRecord = vec!["header1", "header2", "header3"].into(); let headers_expected = (Some(&r), Some(&r)); assert_eq!(headers_actual, headers_expected.clone()); let iter = diff_res.into_iter(); let num_cols_actual = iter.num_columns(); let num_cols_expected = Some(3); assert_eq!(num_cols_actual, num_cols_expected); let headers_actual = ( iter.headers().headers_left(), iter.headers().headers_right(), ); assert_eq!(headers_actual, headers_expected); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn streaming_diff_result_should_have_max_columns_and_headers() -> Result<(), Box> { let csv_diff = csv_diff::csv_diff::CsvByteDiff::new()?; let csv_left = "\ header1,header2,header3\n\ a,_,c\n\ c,_,_"; let csv_right = "\ header1,header2,header3\n\ a,_,d\n\ b,_,_"; let diff_res = csv_diff.diff( Csv::with_reader(csv_left.as_bytes()), Csv::with_reader(csv_right.as_bytes()), ); let num_cols_actual = diff_res.num_columns(); let num_cols_expected = Some(3); assert_eq!(num_cols_actual, num_cols_expected); let r: csv::ByteRecord = vec!["header1", "header2", "header3"].into(); let (headers_left_actual, headers_right_actual) = ( diff_res.headers().headers_left(), diff_res.headers().headers_right(), ); let (headers_left_expected, headers_right_expected) = (r.clone(), r.clone()); assert_eq!( headers_left_actual .ok_or("No left headers".to_string())? .map_err(|e| e.to_string())?, &headers_left_expected ); assert_eq!( headers_right_actual .ok_or("No right headers".to_string())? .map_err(|e| e.to_string())?, &headers_right_expected ); let iter = diff_res.into_iter(); let num_cols_actual = iter.num_columns(); let num_cols_expected = Some(3); assert_eq!(num_cols_actual, num_cols_expected); let (headers_left_actual, headers_right_actual) = ( iter.headers().headers_left(), iter.headers().headers_right(), ); let (headers_left_expected, headers_right_expected) = (r.clone(), r.clone()); assert_eq!( headers_left_actual .ok_or("No left headers".to_string())? .map_err(|e| e.to_string())?, &headers_left_expected ); assert_eq!( headers_right_actual .ok_or("No right headers".to_string())? .map_err(|e| e.to_string())?, &headers_right_expected ); let into_iter = iter.try_to_diff_byte_records()?.into_iter(); let num_cols_actual = into_iter.num_columns(); let num_cols_expected = Some(3); assert_eq!(num_cols_actual, num_cols_expected); let (headers_left_actual, headers_right_actual) = ( into_iter.headers().headers_left(), into_iter.headers().headers_right(), ); let (headers_left_expected, headers_right_expected) = (Some(&r), Some(&r)); assert_eq!(headers_left_actual, headers_left_expected); assert_eq!(headers_right_actual, headers_right_expected); Ok(()) } #[cfg(feature = "rayon-threads")] #[test] fn streaming_collect_into_diffbyterecords_then_sort_by_columns() -> Result<(), Box> { use csv_diff::diff_result::{ColumnIdx, DiffByteRecords}; let csv_diff = csv_diff::csv_diff::CsvByteDiffBuilder::new().build()?; let csv_left = "\ header1,header2,header3\n\ a,_,c\n\ c,_,_"; let csv_right = "\ header1,header2,header3\n\ a,_,d\n\ b,_,_"; let diff_res = csv_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), ); let mut diff_res: DiffByteRecords = diff_res.try_to_diff_byte_records()?; diff_res.sort_by_columns(vec![ColumnIdx::IdxForBoth(0)])?; let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![ DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "_", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "_", "d"]), 2), field_indices: vec![2], }, DiffByteRecord::Add(ByteRecordLineInfo::new( csv::ByteRecord::from(vec!["b", "_", "_"]), 3, )), DiffByteRecord::Delete(ByteRecordLineInfo::new( csv::ByteRecord::from(vec!["c", "_", "_"]), 3, )), ]; assert_eq!(diff_rows_actual, diff_rows_expected.as_slice()); Ok(()) } #[cfg(feature = "crossbeam-threads")] #[test] fn local_create_instance_with_builder_crossbeam_and_diff_with_cursor( ) -> Result<(), Box> { use csv_diff::csv_hash_task_spawner::CsvHashTaskSpawnerLocalBuilderCrossbeam; let csv_byte_diff = csv_diff::csv_diff::CsvByteDiffLocalBuilder::new( CsvHashTaskSpawnerLocalBuilderCrossbeam::new(), ) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_byte_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } #[cfg(feature = "crossbeam-threads")] #[test] fn local_create_instance_with_builder_crossbeam_and_diff_without_cursor( ) -> Result<(), Box> { use csv_diff::csv_hash_task_spawner::CsvHashTaskSpawnerLocalBuilderCrossbeam; let csv_byte_diff = csv_diff::csv_diff::CsvByteDiffLocalBuilder::new( CsvHashTaskSpawnerLocalBuilderCrossbeam::new(), ) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_byte_diff.diff( Csv::with_reader_seek(csv_left.as_bytes()), Csv::with_reader_seek(csv_right.as_bytes()), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } #[cfg(not(feature = "rayon-threads"))] #[test] fn streaming_create_instance_with_builder_std_threads_and_diff_without_cursor( ) -> Result<(), Box> { let csv_byte_diff = csv_diff::csv_diff::CsvByteDiffBuilder::::new( CsvHashTaskSpawnerBuilderStdThreads::new(), ) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let diff_res = csv_byte_diff.diff( Csv::with_reader(csv_left.as_bytes()), Csv::with_reader(csv_right.as_bytes()), ); let diff_rows_actual = diff_res.collect::>>()?; let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } #[cfg(not(feature = "rayon-threads"))] #[test] fn streaming_create_instance_with_builder_std_threads_and_diff_and_try_into( ) -> Result<(), Box> { let csv_byte_diff = csv_diff::csv_diff::CsvByteDiffBuilder::::new( CsvHashTaskSpawnerBuilderStdThreads::new(), ) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let diff_res = csv_byte_diff.diff( Csv::with_reader(csv_left.as_bytes()), Csv::with_reader(csv_right.as_bytes()), ); let diff_byte_records_actual: DiffByteRecords = diff_res.try_to_diff_byte_records()?; let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!( diff_byte_records_actual.as_slice(), diff_rows_expected.as_slice() ); Ok(()) } mod custom_scoped_threads { #[cfg(not(feature = "rayon-threads"))] use super::*; use csv_diff::{ csv_hash_task_spawner::{ CsvHashTaskLineSenders, CsvHashTaskSpawnerLocal, CsvHashTaskSpawnerLocalBuilder, }, csv_parse_result::{CsvParseResultLeft, CsvParseResultRight, RecordHashWithPosition}, }; #[cfg(not(feature = "rayon-threads"))] use pretty_assertions::assert_eq; use std::{ collections::HashSet, io::{Read, Seek}, }; struct CsvHashTaskSpawnerCustomLocal { pool: scoped_pool::Pool, } impl CsvHashTaskSpawnerCustomLocal { pub fn new(pool_size: usize) -> Self { Self { pool: scoped_pool::Pool::new(pool_size), } } } impl CsvHashTaskSpawnerLocal for CsvHashTaskSpawnerCustomLocal { fn spawn_hashing_tasks_and_send_result( &self, csv_hash_task_senders_left: CsvHashTaskLineSenders, csv_hash_task_senders_right: CsvHashTaskLineSenders, primary_key_columns: &HashSet, ) where R: Read + Seek + Send, { self.pool.scoped(move |s| { s.recurse(move |s| { s.execute(move || { Self::parse_hash_and_send_for_compare::< R, CsvParseResultLeft, >( csv_hash_task_senders_left, primary_key_columns ); }); s.execute(move || { Self::parse_hash_and_send_for_compare::< R, CsvParseResultRight, >( csv_hash_task_senders_right, primary_key_columns ); }); }); }); } } struct CsvHashTaskSpawnerBuilderCustom { pool_size: usize, } impl CsvHashTaskSpawnerBuilderCustom { #[cfg(not(feature = "rayon-threads"))] pub fn new(pool_size: usize) -> Self { Self { pool_size } } } impl CsvHashTaskSpawnerLocalBuilder for CsvHashTaskSpawnerBuilderCustom { fn build(self) -> CsvHashTaskSpawnerCustomLocal { CsvHashTaskSpawnerCustomLocal::new(self.pool_size) } } #[cfg(not(feature = "rayon-threads"))] #[test] fn local_create_instance_with_builder_custom_scoped_threads_and_diff( ) -> Result<(), Box> { let csv_byte_diff = csv_diff::csv_diff::CsvByteDiffLocalBuilder::new( CsvHashTaskSpawnerBuilderCustom::new(4), ) .primary_key_columns(std::iter::once(0)) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let mut diff_res = csv_byte_diff.diff( Csv::with_reader_seek(Cursor::new(csv_left.as_bytes())), Csv::with_reader_seek(Cursor::new(csv_right.as_bytes())), )?; diff_res.sort_by_line(); let diff_rows_actual = diff_res.as_slice(); let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } } mod custom_threads { #[cfg(not(feature = "rayon-threads"))] use super::*; use crossbeam_channel::bounded; use csv_diff::{ csv_hash_task_spawner::{CsvHashTaskSpawner, CsvHashTaskSpawnerBuilder}, csv_parse_result::{CsvByteRecordWithHash, CsvParseResultLeft, CsvParseResultRight}, }; #[cfg(not(feature = "rayon-threads"))] use pretty_assertions::assert_eq; use std::{collections::HashSet, io::Read}; struct CsvHashTaskSpawnerCustom; impl CsvHashTaskSpawnerCustom { pub fn new() -> Self { Self } } impl CsvHashTaskSpawner for CsvHashTaskSpawnerCustom { fn spawn_hashing_tasks_and_send_result( self, csv_hash_task_sender_left: csv_diff::csv_hash_task_spawner::CsvHashTaskSenderWithRecycleReceiver, csv_hash_task_sender_right: csv_diff::csv_hash_task_spawner::CsvHashTaskSenderWithRecycleReceiver, csv_hash_receiver_comparer: csv_diff::csv_hash_receiver_comparer::CsvHashReceiverStreamComparer, primary_key_columns: HashSet, ) -> ( Self, crossbeam_channel::Receiver, ) where Self: Sized, { let (sender, receiver) = bounded(1); let prim_key_columns_clone = primary_key_columns.clone(); std::thread::spawn(move || { sender .send(csv_hash_receiver_comparer.recv_hashes_and_compare()) .unwrap(); }); std::thread::spawn(move || { Self::parse_hash_and_send_for_compare::< R, CsvParseResultLeft, >(csv_hash_task_sender_left, primary_key_columns); }); std::thread::spawn(move || { Self::parse_hash_and_send_for_compare::< R, CsvParseResultRight, >(csv_hash_task_sender_right, prim_key_columns_clone); }); (self, receiver) } } struct CsvHashTaskSpawnerBuilderCustom; impl CsvHashTaskSpawnerBuilderCustom { #[cfg(not(feature = "rayon-threads"))] pub fn new() -> Self { Self } } impl CsvHashTaskSpawnerBuilder for CsvHashTaskSpawnerBuilderCustom { fn build(self) -> CsvHashTaskSpawnerCustom { CsvHashTaskSpawnerCustom::new() } } #[cfg(not(feature = "rayon-threads"))] #[test] fn streaming_create_instance_with_builder_custom_threads_and_diff( ) -> Result<(), Box> { let csv_byte_diff = csv_diff::csv_diff::CsvByteDiffBuilder::new(CsvHashTaskSpawnerBuilderCustom::new()) .primary_key_columns(std::iter::once(0)) .build()?; let csv_left = "\ header1,header2,header3\n\ a,b,c"; let csv_right = "\ header1,header2,header3\n\ a,b,d"; let diff_res = csv_byte_diff.diff( Csv::with_reader(csv_left.as_bytes()), Csv::with_reader(csv_right.as_bytes()), ); let diff_rows_actual = diff_res.collect::>>()?; let diff_rows_expected = vec![DiffByteRecord::Modify { delete: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "c"]), 2), add: ByteRecordLineInfo::new(csv::ByteRecord::from(vec!["a", "b", "d"]), 2), field_indices: vec![2], }]; assert_eq!(diff_rows_actual, diff_rows_expected); Ok(()) } } }