#[cfg(all(feature = "async", test))] mod async_transfer_data_complex_step_test { use std::fs::File; use std::io; use std::path::Path; use std::pin::Pin; use std::sync::Arc; use chrono::NaiveDate; use csv_async::StringRecord; use deadpool_postgres::Timeouts; use diesel::{insert_into, Insertable, Queryable}; use diesel_async::{AsyncPgConnection, RunQueryDsl, SimpleAsyncConnection}; use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig, RecyclingMethod}; use diesel_async::pooled_connection::deadpool::{BuildError, Pool}; use futures::executor::block_on; use futures::Stream; use peak_alloc::PeakAlloc; use tokio::sync::Mutex; use zip::ZipArchive; use batch_processing::tokio::step::AsyncStepRunner; use batch_processing::tokio::step::complex_step::{AsyncComplexStepBuilder, ComplexStepBuilderTrait}; use batch_processing::tokio::step::step_builder::AsyncStepBuilderTrait; #[global_allocator] static PEAK_ALLOC: PeakAlloc = PeakAlloc; macro_rules! resource_file {($fname:expr) => ( concat!(env!("CARGO_MANIFEST_DIR"), "/tests/resources/", $fname) // assumes Linux ('/')! )} diesel::table! { car_prices (id) { id -> Serial, year -> Nullable, make -> Nullable, model -> Nullable, trim -> Nullable, body -> Nullable, transmission -> Nullable, vin -> Nullable, state -> Nullable, condition -> Nullable, odometer -> Nullable, color -> Nullable, interior -> Nullable, seller -> Nullable, nmr -> Nullable, sellingprice -> Nullable, saledate -> Nullable, } } #[derive(Insertable, Queryable)] #[diesel(table_name = car_prices, primary_key(id))] pub struct CarPrice { pub id: Option, pub year: Option, pub make: Option, pub model: Option, pub trim: Option, pub body: Option, pub transmission: Option, pub vin: Option, pub state: Option, pub condition: Option, pub odometer: Option, pub color: Option, pub interior: Option, pub seller: Option, pub nmr: Option, pub sellingprice: Option, pub saledate: Option, } async fn get_pool() -> Result, BuildError> { let pg_url = "postgres://postgres:postgres@localhost:5432/postgres"; let mut config: ManagerConfig = ManagerConfig::default(); config.recycling_method = RecyclingMethod::Fast; let config = AsyncDieselConnectionManager::::new_with_config(pg_url, config); let mut timeout = Timeouts::new(); timeout.recycle = Some(std::time::Duration::from_secs(10)); let pool = Pool::builder(config) .max_size(2) .build(); if let Err(e) = pool { return Err(e); }; let pool = pool.unwrap(); Ok(pool) } fn unzip_csv_file() -> Option { let output_path = "/tmp/car_prices.csv".to_string(); let exist_file = Path::new("/tmp/car_prices.csv").exists(); if exist_file { return Some(output_path); } let zip_path = resource_file!("car_prices.zip"); let zip_file = File::open(zip_path).unwrap(); let mut archiver = ZipArchive::new(zip_file).unwrap(); for i in 0..archiver.len() { let mut file = archiver.by_index(i).unwrap(); if file.name().ends_with(".csv") { let mut output_file = File::create(output_path.clone()).unwrap(); io::copy(&mut file, &mut output_file).unwrap(); return Some(output_path); } } return None; } #[tokio::test] async fn test_transfer_data_complex_step() { // enable_test_log(); let pool = Arc::new(get_pool().await.expect("Error creating pool")); let create_table_car_prices = include_str!("../../../migrations/car_prices.sql"); let mut conn = pool.get().await.expect("Error getting connection"); conn.batch_execute(create_table_car_prices).await.expect("Error creating table"); conn.batch_execute("TRUNCATE TABLE car_prices").await.expect("Error truncating table"); let csv_file = unzip_csv_file(); if let None = csv_file { panic!("CSV file not found"); } let csv_path = csv_file.unwrap(); let all_memory_usage = Arc::new(Mutex::new(Vec::new())); let final_memory_usage = Arc::clone(&all_memory_usage); let step_builder: AsyncComplexStepBuilder, CarPrice> = AsyncComplexStepBuilder::get("csv_transfer".to_string()) .reader( Box::new(move || { let csv_file = csv_path.clone(); let csv_file = tokio::fs::File::open(csv_file); let csv_file = block_on(csv_file).ok().expect("Error opening file"); let reader = csv_async::AsyncReader::from_reader(csv_file); let stream: Pin> + Send>> = Box::pin(reader.into_records()); Box::pin(async move { stream }) }) ) .processor( Box::new( |csv_line: Result| { let car_price = csv_line.unwrap(); return Box::pin( async move { let car_price = CarPrice { id: None, year: car_price.get(1).unwrap().parse::().ok(), make: car_price.get(2).map(|s| Some(s.to_string())).unwrap_or(None), model: car_price.get(3).map(|s| Some(s.to_string())).unwrap_or(None), trim: car_price.get(4).map(|s| Some(s.to_string())).unwrap_or(None), body: car_price.get(5).map(|s| Some(s.to_string())).unwrap_or(None), transmission: car_price.get(6).map(|s| Some(s.to_string())).unwrap_or(None), vin: car_price.get(7).map(|s| Some(s.to_string())).unwrap_or(None), state: car_price.get(8).map(|s| Some(s.to_string())).unwrap_or(None), condition: car_price.get(9).map(|s| s.parse::().ok()).unwrap_or(None), odometer: car_price.get(10).map(|s| s.parse::().ok()).unwrap_or(None), color: car_price.get(11).map(|s| Some(s.to_string())).unwrap_or(None), interior: car_price.get(12).map(|s| Some(s.to_string())).unwrap_or(None), seller: car_price.get(13).map(|s| Some(s.to_string())).unwrap_or(None), nmr: car_price.get(14).map(|s| s.parse::().ok()).unwrap_or(None), sellingprice: car_price.get(15).map(|s| s.parse::().ok()).unwrap_or(None), saledate: None, }; car_price } ); } ) ).writer( Box::new( move |vec_car_price: Vec| { let pool = Arc::clone(&pool); let all_memory_usage = Arc::clone(&all_memory_usage); return Box::pin( async move { // let mut vec = vec_car_price.chunks(2000); let mut conn = pool.get().await.expect("Error getting connection"); // while let Some(chunk) = vec.next() { insert_into(car_prices::table) .values(vec_car_price) // .values(chunk) .execute(&mut conn) .await .expect("Error inserting data"); // } let current_mem = PEAK_ALLOC.current_usage_as_kb() as i32; all_memory_usage.lock().await.push(current_mem); } ); } ) ) .chunk_size(1000); let step = step_builder.build(); let step_status = step.run().await; assert!(step_status.status.is_ok(), "The step should be successful"); let all_memory_usage = final_memory_usage.lock().await; let max_memory_for_usage = 4000; let max_memory = all_memory_usage.iter().max().unwrap(); println!("Max memory: {}", max_memory); assert!(max_memory <= &max_memory_for_usage, "Memory usage is greater than expected") } }