#[cfg(all(feature = "async", test))] mod job_test { use std::pin::Pin; use futures::{Stream, stream}; use batch_processing::tokio::job::job_builder::{AsyncJobBuilder, AsyncJobBuilderTrait}; use batch_processing::tokio::step::{AsyncStepRunner, AsyncStep}; use batch_processing::tokio::step::complex_step::{AsyncComplexStepBuilder, ComplexStepBuilderTrait}; use batch_processing::tokio::step::simple_step::{AsyncSimpleStepBuilder, AsyncSimpleStepBuilderTrait}; use batch_processing::tokio::step::step_builder::AsyncStepBuilderTrait; #[tokio::test] async fn job_simple_step() { fn generate_step(step_count: i8) -> AsyncStep { return AsyncSimpleStepBuilder::get(String::from(format!("step{}", step_count))) .tasklet(Box::new(move || { return Box::pin(async move { println!("{}", format!("Step {}", step_count)); }); })) .build(); } let mut job_builder = AsyncJobBuilder::get(String::from("sync-data")) .step(generate_step(1)) .step(generate_step(2)); for i in 3..=10 { job_builder = job_builder.step(generate_step(i)); } let job = job_builder.build(); let job_status = job.run().await; println!("{:?}", job_status); } #[tokio::test] async fn job_complex_step() { fn generate_step( step_count: i32 ) -> AsyncStep { let step_builder: AsyncComplexStepBuilder = AsyncComplexStepBuilder::get("test".to_string()) .reader(Box::new(move || { let step_count = step_count; let mut vec: Vec = Vec::new(); for n in 0..=step_count { vec.push(n); } let stream: Pin + Send>> = Box::pin(Box::new(stream::iter(vec))); Box::pin(async move { stream } ) })) .processor( Box::new( move |item: i32| Box::pin( async move { item * 2 } ) ) ) .writer( Box::new( move |items: Vec| Box::pin( async move { println!("{:?}", items); } ) ) ); return step_builder.build(); } let mut job_builder = AsyncJobBuilder::get(String::from("job_complex_step")) .multi_tasks(4); for i in 1..=4 { job_builder = job_builder.step(generate_step(i)); } let job = job_builder.build(); job.run().await; } }