use async_stream::stream; use futures_core::stream::{FusedStream, Stream}; use futures_util::pin_mut; use futures_util::stream::StreamExt; use tokio::sync::mpsc; use tokio_test::assert_ok; #[tokio::test] async fn noop_stream() { let s = stream! {}; pin_mut!(s); while s.next().await.is_some() { unreachable!(); } } #[tokio::test] async fn empty_stream() { let mut ran = false; { let r = &mut ran; let s = stream! { *r = true; println!("hello world!"); }; pin_mut!(s); while s.next().await.is_some() { unreachable!(); } } assert!(ran); } #[tokio::test] async fn yield_single_value() { let s = stream! { yield "hello"; }; let values: Vec<_> = s.collect().await; assert_eq!(1, values.len()); assert_eq!("hello", values[0]); } #[tokio::test] async fn fused() { let s = stream! { yield "hello"; }; pin_mut!(s); assert!(!s.is_terminated()); assert_eq!(s.next().await, Some("hello")); assert_eq!(s.next().await, None); assert!(s.is_terminated()); // This should return None from now on assert_eq!(s.next().await, None); } #[tokio::test] async fn yield_multi_value() { let s = stream! { yield "hello"; yield "world"; yield "dizzy"; }; let values: Vec<_> = s.collect().await; assert_eq!(3, values.len()); assert_eq!("hello", values[0]); assert_eq!("world", values[1]); assert_eq!("dizzy", values[2]); } #[tokio::test] async fn unit_yield_in_select() { use tokio::select; async fn do_stuff_async() {} let s = stream! { select! { _ = do_stuff_async() => yield, else => yield, } }; let values: Vec<_> = s.collect().await; assert_eq!(values.len(), 1); } #[tokio::test] async fn yield_with_select() { use tokio::select; async fn do_stuff_async() {} async fn more_async_work() {} let s = stream! { select! { _ = do_stuff_async() => yield "hey", _ = more_async_work() => yield "hey", else => yield "hey", } }; let values: Vec<_> = s.collect().await; assert_eq!(values, vec!["hey"]); } #[tokio::test] async fn return_stream() { fn build_stream() -> impl Stream { stream! { yield 1; yield 2; yield 3; } } let s = build_stream(); let values: Vec<_> = s.collect().await; assert_eq!(3, values.len()); assert_eq!(1, values[0]); assert_eq!(2, values[1]); assert_eq!(3, values[2]); } #[tokio::test] async fn consume_channel() { let (tx, mut rx) = mpsc::channel(10); let s = stream! { while let Some(v) = rx.recv().await { yield v; } }; pin_mut!(s); for i in 0..3 { assert_ok!(tx.send(i).await); assert_eq!(Some(i), s.next().await); } drop(tx); assert_eq!(None, s.next().await); } #[tokio::test] async fn borrow_self() { struct Data(String); impl Data { fn stream<'a>(&'a self) -> impl Stream + 'a { stream! { yield &self.0[..]; } } } let data = Data("hello".to_string()); let s = data.stream(); pin_mut!(s); assert_eq!(Some("hello"), s.next().await); } #[tokio::test] async fn stream_in_stream() { let s = stream! { let s = stream! { for i in 0..3 { yield i; } }; pin_mut!(s); while let Some(v) = s.next().await { yield v; } }; let values: Vec<_> = s.collect().await; assert_eq!(3, values.len()); } #[tokio::test] async fn yield_non_unpin_value() { let s: Vec<_> = stream! { for i in 0..3 { yield async move { i }; } } .buffered(1) .collect() .await; assert_eq!(s, vec![0, 1, 2]); } #[test] fn inner_try_stream() { use async_stream::try_stream; use tokio::select; async fn do_stuff_async() {} let _ = stream! { select! { _ = do_stuff_async() => { let another_s = try_stream! { yield; }; let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap(); }, else => {}, } yield }; } #[rustversion::attr(not(stable), ignore)] #[test] fn test() { let t = trybuild::TestCases::new(); t.compile_fail("tests/ui/*.rs"); }