#[cfg(all(feature = "async", test))] mod async_complex_step_test { use batch_processing::tokio::step::complex_step::{AsyncComplexStepBuilder, ComplexStepBuilderTrait}; use batch_processing::tokio::step::step_builder::AsyncStepBuilderTrait; use batch_processing::tokio::step::AsyncStepRunner; use futures::{stream, Stream}; use std::pin::Pin; use std::sync::Arc; use tokio::sync::Mutex; #[tokio::test] async fn test_build() { let step_builder: AsyncComplexStepBuilder = AsyncComplexStepBuilder::get("test".to_string()) .reader(Box::new(move || { return Box::pin(async move { let stream: Pin + Send>> = Box::pin(stream::iter(vec![String::new()])); stream } ); })) .processor( Box::new( move |item: String| Box::pin( async move { item.to_uppercase() } ) ) ) .writer( Box::new( move |items: Vec| Box::pin( async move { println!("{:?}", items); } ) ) ); let step = step_builder.build(); step.run().await; } #[tokio::test] async fn test_fault_tolerant_is_false() { let vec = Arc::new(Mutex::new(Vec::new())); let vec_write = vec.clone(); let step_builder: AsyncComplexStepBuilder = AsyncComplexStepBuilder::get("test".to_string()) .chunk_size(1) .reader(Box::new(move || { return Box::pin(async move { let stream: Pin + Send>> = Box::pin(stream::iter(vec!["test-failed".to_string(), "test".to_string()])); stream } ); })) .processor( Box::new( move |item: String| Box::pin( async move { if item == "test-failed".to_string() { panic!("test failed"); } return item.to_uppercase(); } ) ) ) .writer( Box::new( move |items: Vec| { let vec_write = vec_write.clone(); Box::pin( async move { let mut vec = vec_write.lock().await; for item in items { vec.push(item); } } ) } ) ); let step = step_builder.build(); let step_result = step.run().await; let vec = vec.lock().await; assert_eq!(step_result.status.is_err(), true); assert_eq!(vec.len(), 0); } #[tokio::test] async fn test_fault_tolerant_is_true() { let vec = Arc::new(Mutex::new(Vec::new())); let vec_write = vec.clone(); let step_builder: AsyncComplexStepBuilder = AsyncComplexStepBuilder::get("test".to_string()) .chunk_size(1) .throw_tolerant() .reader(Box::new(move || { return Box::pin(async move { let stream: Pin + Send>> = Box::pin(stream::iter(vec!["test-failed".to_string(), "test".to_string()])); stream } ); })) .processor( Box::new( move |item: String| Box::pin( async move { if item == "test-failed".to_string() { panic!("test failed"); } return item.to_uppercase(); } ) ) ) .writer( Box::new( move |items: Vec| { let vec_write = vec_write.clone(); Box::pin( async move { let mut vec = vec_write.lock().await; for item in items { vec.push(item); } } ) } ) ); let step = step_builder.build(); let step_result = step.run().await; let vec = vec.lock().await; assert_eq!(step_result.status.is_err(), true); assert_eq!(vec.len(), 1); } }