use futures::stream::iter; use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap}; use tokio_test::{assert_ok, assert_pending, assert_ready, task}; use std::future::{poll_fn, Future}; use std::pin::{pin, Pin}; use std::task::Poll; mod support { pub(crate) mod mpsc; } use support::mpsc; macro_rules! assert_ready_some { ($($t:tt)*) => { match assert_ready!($($t)*) { Some(v) => v, None => panic!("expected `Some`, got `None`"), } }; } macro_rules! assert_ready_none { ($($t:tt)*) => { match assert_ready!($($t)*) { None => {} Some(v) => panic!("expected `None`, got `Some({:?})`", v), } }; } #[tokio::test] async fn empty() { let mut map = StreamMap::<&str, stream::Pending<()>>::new(); assert_eq!(map.len(), 0); assert!(map.is_empty()); assert!(map.next().await.is_none()); assert!(map.next().await.is_none()); assert!(map.remove("foo").is_none()); } #[tokio::test] async fn single_entry() { let mut map = task::spawn(StreamMap::new()); let (tx, rx) = mpsc::unbounded_channel_stream(); let rx = Box::pin(rx); assert_ready_none!(map.poll_next()); assert!(map.insert("foo", rx).is_none()); assert!(map.contains_key("foo")); assert!(!map.contains_key("bar")); assert_eq!(map.len(), 1); assert!(!map.is_empty()); assert_pending!(map.poll_next()); assert_ok!(tx.send(1)); assert!(map.is_woken()); let (k, v) = assert_ready_some!(map.poll_next()); assert_eq!(k, "foo"); assert_eq!(v, 1); assert_pending!(map.poll_next()); assert_ok!(tx.send(2)); assert!(map.is_woken()); let (k, v) = assert_ready_some!(map.poll_next()); assert_eq!(k, "foo"); assert_eq!(v, 2); assert_pending!(map.poll_next()); drop(tx); assert!(map.is_woken()); assert_ready_none!(map.poll_next()); } #[tokio::test] async fn multiple_entries() { let mut map = task::spawn(StreamMap::new()); let (tx1, rx1) = mpsc::unbounded_channel_stream(); let (tx2, rx2) = mpsc::unbounded_channel_stream(); let rx1 = Box::pin(rx1); let rx2 = Box::pin(rx2); map.insert("foo", rx1); map.insert("bar", rx2); assert_pending!(map.poll_next()); assert_ok!(tx1.send(1)); assert!(map.is_woken()); let (k, v) = assert_ready_some!(map.poll_next()); assert_eq!(k, "foo"); assert_eq!(v, 1); assert_pending!(map.poll_next()); assert_ok!(tx2.send(2)); assert!(map.is_woken()); let (k, v) = assert_ready_some!(map.poll_next()); assert_eq!(k, "bar"); assert_eq!(v, 2); assert_pending!(map.poll_next()); assert_ok!(tx1.send(3)); assert_ok!(tx2.send(4)); assert!(map.is_woken()); // Given the randomization, there is no guarantee what order the values will // be received in. let mut v = (0..2) .map(|_| assert_ready_some!(map.poll_next())) .collect::>(); assert_pending!(map.poll_next()); v.sort_unstable(); assert_eq!(v[0].0, "bar"); assert_eq!(v[0].1, 4); assert_eq!(v[1].0, "foo"); assert_eq!(v[1].1, 3); drop(tx1); assert!(map.is_woken()); assert_pending!(map.poll_next()); drop(tx2); assert_ready_none!(map.poll_next()); } #[tokio::test] async fn insert_remove() { let mut map = task::spawn(StreamMap::new()); let (tx, rx) = mpsc::unbounded_channel_stream(); let rx = Box::pin(rx); assert_ready_none!(map.poll_next()); assert!(map.insert("foo", rx).is_none()); let rx = map.remove("foo").unwrap(); assert_ok!(tx.send(1)); assert!(!map.is_woken()); assert_ready_none!(map.poll_next()); assert!(map.insert("bar", rx).is_none()); let v = assert_ready_some!(map.poll_next()); assert_eq!(v.0, "bar"); assert_eq!(v.1, 1); assert!(map.remove("bar").is_some()); assert_ready_none!(map.poll_next()); assert!(map.is_empty()); assert_eq!(0, map.len()); } #[tokio::test] async fn replace() { let mut map = task::spawn(StreamMap::new()); let (tx1, rx1) = mpsc::unbounded_channel_stream(); let (tx2, rx2) = mpsc::unbounded_channel_stream(); let rx1 = Box::pin(rx1); let rx2 = Box::pin(rx2); assert!(map.insert("foo", rx1).is_none()); assert_pending!(map.poll_next()); let _rx1 = map.insert("foo", rx2).unwrap(); assert_pending!(map.poll_next()); tx1.send(1).unwrap(); assert_pending!(map.poll_next()); tx2.send(2).unwrap(); assert!(map.is_woken()); let v = assert_ready_some!(map.poll_next()); assert_eq!(v.0, "foo"); assert_eq!(v.1, 2); } #[test] fn size_hint_with_upper() { let mut map = StreamMap::new(); map.insert("a", stream::iter(vec![1])); map.insert("b", stream::iter(vec![1, 2])); map.insert("c", stream::iter(vec![1, 2, 3])); assert_eq!(3, map.len()); assert!(!map.is_empty()); let size_hint = map.size_hint(); assert_eq!(size_hint, (6, Some(6))); } #[test] fn size_hint_without_upper() { let mut map = StreamMap::new(); map.insert("a", pin_box(stream::iter(vec![1]))); map.insert("b", pin_box(stream::iter(vec![1, 2]))); map.insert("c", pin_box(pending())); let size_hint = map.size_hint(); assert_eq!(size_hint, (3, None)); } #[test] fn new_capacity_zero() { let map = StreamMap::<&str, stream::Pending<()>>::new(); assert_eq!(0, map.capacity()); assert!(map.keys().next().is_none()); } #[test] fn with_capacity() { let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10); assert!(10 <= map.capacity()); assert!(map.keys().next().is_none()); } #[test] fn iter_keys() { let mut map = StreamMap::new(); map.insert("a", pending::()); map.insert("b", pending()); map.insert("c", pending()); let mut keys = map.keys().collect::>(); keys.sort_unstable(); assert_eq!(&keys[..], &[&"a", &"b", &"c"]); } #[test] fn iter_values() { let mut map = StreamMap::new(); map.insert("a", stream::iter(vec![1])); map.insert("b", stream::iter(vec![1, 2])); map.insert("c", stream::iter(vec![1, 2, 3])); let mut size_hints = map.values().map(|s| s.size_hint().0).collect::>(); size_hints.sort_unstable(); assert_eq!(&size_hints[..], &[1, 2, 3]); } #[test] fn iter_values_mut() { let mut map = StreamMap::new(); map.insert("a", stream::iter(vec![1])); map.insert("b", stream::iter(vec![1, 2])); map.insert("c", stream::iter(vec![1, 2, 3])); let mut size_hints = map .values_mut() .map(|s: &mut _| s.size_hint().0) .collect::>(); size_hints.sort_unstable(); assert_eq!(&size_hints[..], &[1, 2, 3]); } #[test] fn clear() { let mut map = task::spawn(StreamMap::new()); map.insert("a", stream::iter(vec![1])); map.insert("b", stream::iter(vec![1, 2])); map.insert("c", stream::iter(vec![1, 2, 3])); assert_ready_some!(map.poll_next()); map.clear(); assert_ready_none!(map.poll_next()); assert!(map.is_empty()); } #[test] fn contains_key_borrow() { let mut map = StreamMap::new(); map.insert("foo".to_string(), pending::<()>()); assert!(map.contains_key("foo")); } #[test] fn one_ready_many_none() { // Run a few times because of randomness for _ in 0..100 { let mut map = task::spawn(StreamMap::new()); map.insert(0, pin_box(stream::empty())); map.insert(1, pin_box(stream::empty())); map.insert(2, pin_box(stream::once("hello"))); map.insert(3, pin_box(stream::pending())); let v = assert_ready_some!(map.poll_next()); assert_eq!(v, (2, "hello")); } } fn pin_box + 'static, U>(s: T) -> Pin>> { Box::pin(s) } type UsizeStream = Pin + Send>>; #[tokio::test] async fn poll_next_many_zero() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(pending()) as UsizeStream); let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 0)).await; assert_eq!(n, 0); } #[tokio::test] async fn poll_next_many_empty() { let mut stream_map: StreamMap = StreamMap::new(); let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 1)).await; assert_eq!(n, 0); } #[tokio::test] async fn poll_next_many_pending() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(pending()) as UsizeStream); let mut is_pending = false; poll_fn(|cx| { let poll = stream_map.poll_next_many(cx, &mut vec![], 1); is_pending = poll.is_pending(); Poll::Ready(()) }) .await; assert!(is_pending); } #[tokio::test] async fn poll_next_many_not_enough() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream); stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream); let mut buffer = vec![]; let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await; assert_eq!(n, 2); assert_eq!(buffer.len(), 2); assert!(buffer.contains(&(0, 0))); assert!(buffer.contains(&(1, 1))); } #[tokio::test] async fn poll_next_many_enough() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream); stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream); let mut buffer = vec![]; let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await; assert_eq!(n, 2); assert_eq!(buffer.len(), 2); assert!(buffer.contains(&(0, 0))); assert!(buffer.contains(&(1, 1))); } #[tokio::test] async fn poll_next_many_correctly_loops_around() { for _ in 0..10 { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream); stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream); stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream); let mut buffer = vec![]; let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await; assert_eq!(n, 3); assert_eq!( std::mem::take(&mut buffer) .into_iter() .map(|(_, v)| v) .collect::>(), vec![0, 0, 0] ); let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await; assert_eq!(n, 2); assert_eq!( std::mem::take(&mut buffer) .into_iter() .map(|(_, v)| v) .collect::>(), vec![1, 1] ); let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 1)).await; assert_eq!(n, 1); assert_eq!( std::mem::take(&mut buffer) .into_iter() .map(|(_, v)| v) .collect::>(), vec![2] ); } } #[tokio::test] async fn next_many_zero() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(pending()) as UsizeStream); let n = poll_fn(|cx| pin!(stream_map.next_many(&mut vec![], 0)).poll(cx)).await; assert_eq!(n, 0); } #[tokio::test] async fn next_many_empty() { let mut stream_map: StreamMap = StreamMap::new(); let n = stream_map.next_many(&mut vec![], 1).await; assert_eq!(n, 0); } #[tokio::test] async fn next_many_pending() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(pending()) as UsizeStream); let mut is_pending = false; poll_fn(|cx| { let poll = pin!(stream_map.next_many(&mut vec![], 1)).poll(cx); is_pending = poll.is_pending(); Poll::Ready(()) }) .await; assert!(is_pending); } #[tokio::test] async fn next_many_not_enough() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream); stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream); let mut buffer = vec![]; let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await; assert_eq!(n, 2); assert_eq!(buffer.len(), 2); assert!(buffer.contains(&(0, 0))); assert!(buffer.contains(&(1, 1))); } #[tokio::test] async fn next_many_enough() { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream); stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream); let mut buffer = vec![]; let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await; assert_eq!(n, 2); assert_eq!(buffer.len(), 2); assert!(buffer.contains(&(0, 0))); assert!(buffer.contains(&(1, 1))); } #[tokio::test] async fn next_many_correctly_loops_around() { for _ in 0..10 { let mut stream_map: StreamMap = StreamMap::new(); stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream); stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream); stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream); let mut buffer = vec![]; let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await; assert_eq!(n, 3); assert_eq!( std::mem::take(&mut buffer) .into_iter() .map(|(_, v)| v) .collect::>(), vec![0, 0, 0] ); let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await; assert_eq!(n, 2); assert_eq!( std::mem::take(&mut buffer) .into_iter() .map(|(_, v)| v) .collect::>(), vec![1, 1] ); let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 1)).poll(cx)).await; assert_eq!(n, 1); assert_eq!( std::mem::take(&mut buffer) .into_iter() .map(|(_, v)| v) .collect::>(), vec![2] ); } }