use std::sync::atomic::Ordering::{Acquire, Release, SeqCst}; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::Arc; use async_pipes::{branch, Pipeline, WorkerOptions}; #[tokio::test] async fn test_stage_producer() { let written = Arc::new(AtomicBool::new(false)); let task_written = written.clone(); Pipeline::builder() .with_producer("pipe", move || { let written = task_written.clone(); async move { if !written.load(Acquire) { written.store(true, Release); Some("hello!".to_string()) } else { None } } }) .with_consumer( "pipe", WorkerOptions::default_single_task(), |value: String| async move { assert_eq!(value, "hello!".to_string()); }, ) .build() .unwrap() .wait() .await; assert!(written.load(Acquire), "value was not handled by worker!"); } #[tokio::test] async fn test_stage_branching_producer() { let value_written = Arc::new(AtomicBool::new(false)); let task_value_written = value_written.clone(); let double_written = Arc::new(AtomicBool::new(false)); let task_double_written = double_written.clone(); let value_sum = Arc::new(AtomicUsize::new(0)); let task_value_sum = value_sum.clone(); let double_sum = Arc::new(AtomicUsize::new(0)); let task_double_sum = double_sum.clone(); let count = Arc::new(AtomicUsize::new(0)); Pipeline::builder() .with_branching_producer(vec!["Value", "Doubled"], move || { let count = count.clone(); async move { let c = count.load(Acquire); if c < 10 { count.fetch_add(1, SeqCst); Some(branch![c, c * 2]) } else { None } } }) .with_consumer( "Value", WorkerOptions::default_single_task(), move |value: usize| { let written = task_value_written.clone(); let sum = task_value_sum.clone(); async move { written.store(true, Release); sum.fetch_add(value, SeqCst); } }, ) .with_consumer( "Doubled", WorkerOptions::default_single_task(), move |value: usize| { let written = task_double_written.clone(); let sum = task_double_sum.clone(); async move { written.store(true, Release); sum.fetch_add(value, SeqCst); } }, ) .build() .unwrap() .wait() .await; assert!( value_written.load(Acquire), "value was not handled by worker!" ); assert!( double_written.load(Acquire), "value was not handled by worker!" ); assert_eq!(value_sum.load(Acquire), 45); assert_eq!(double_sum.load(Acquire), 90); } #[tokio::test] async fn test_stage_single_output() { let written = Arc::new(AtomicBool::new(false)); let task_written = written.clone(); Pipeline::builder() .with_inputs("first", vec![1usize]) .with_stage( "first", "second", WorkerOptions::default_single_task(), |value: usize| async move { Some(value + 1) }, ) .with_consumer( "second", WorkerOptions::default_single_task(), move |value: usize| { let written = task_written.clone(); async move { assert_eq!(value, 2); written.store(true, Release); } }, ) .build() .unwrap() .wait() .await; assert!(written.load(Acquire), "value was not handled by worker!"); } #[tokio::test] async fn test_stage_flattener() { let sum = Arc::new(AtomicUsize::new(0)); let task_sum = sum.clone(); Pipeline::builder() .with_inputs("first", vec![vec![1usize, 2usize, 3usize]]) .with_flattener::>("first", "second") .with_consumer( "second", WorkerOptions::default_single_task(), move |value: usize| { let sum = task_sum.clone(); async move { sum.fetch_add(value, SeqCst); } }, ) .build() .unwrap() .wait() .await; assert_eq!(sum.load(Acquire), 6); } #[tokio::test] #[should_panic] async fn test_stage_propagates_task_panic() { Pipeline::builder() .with_inputs("pipe", vec![()]) .with_consumer( "pipe", WorkerOptions::default_single_task(), |_: ()| async move { panic!("AHH!") }, ) .build() .unwrap() .wait() .await; }