use futures::{future::pending, StreamExt}; use std::{collections::HashSet, time::Duration}; use zestors::{ actor_reference::{ActorRefExt, Transformable}, actor_type::Inbox, messaging::RecvError, protocol, spawning::{spawn, spawn_many, spawn_many_with, spawn_with, BackPressure, Capacity, Link}, }; #[tokio::test] async fn spawn_and_abort() { let (mut child, _address) = spawn(|_: Inbox<()>| async move { let () = pending().await; }); child.abort(); assert!(child.await.unwrap_err().is_abort()); } #[tokio::test] async fn spawn_await_address() { let (mut child, address) = spawn(|_: Inbox<()>| async move { let () = pending().await; }); child.abort(); address.await; } #[tokio::test] async fn spawn_and_panic() { let (child, _address) = spawn(|_: Inbox<()>| async move { panic!() }); assert!(child.await.unwrap_err().is_panic()); } #[tokio::test] async fn spawn_and_normal_exit() { let (child, _address) = spawn(|_: Inbox<()>| async move {}); assert!(child.await.is_ok()); } #[tokio::test] async fn spawn_and_halt() { let (child, _address) = spawn(|mut inbox: Inbox<()>| async move { assert_eq!(inbox.recv().await.unwrap_err(), RecvError::Halted); }); child.halt(); assert!(child.await.is_ok()); } #[tokio::test] async fn spawn_and_drop() { let (child, address) = spawn_with( Link::Attached(Duration::from_millis(10).into()), Capacity::Bounded(10), |mut inbox: Inbox<()>| async move { assert_eq!(inbox.recv().await.unwrap_err(), RecvError::Halted); let () = pending().await; }, ); drop(child); address.await; } #[tokio::test] async fn spawn_and_drop_detached() { let (child, address) = spawn_with( Link::Detached, Capacity::BackPressure(BackPressure::default()), |mut inbox: Inbox<()>| async move { assert_eq!(inbox.recv().await.unwrap(), ()); }, ); drop(child); tokio::time::sleep(Duration::from_millis(10)).await; address.send(()).await.unwrap(); address.await; } #[tokio::test] async fn base_counts() { let (mut child, address) = spawn(|inbox: Inbox<()>| async move { pending::<()>().await; drop(inbox); }); assert_eq!(child.address_count(), 1); assert_eq!(child.process_count(), 1); assert_eq!(address.msg_count(), 0); child.abort(); } #[tokio::test] async fn address_counts() { let (mut child, address) = spawn(|inbox: Inbox<()>| async move { pending::<()>().await; drop(inbox); }); assert_eq!(child.address_count(), 1); let address2 = address.clone(); assert_eq!(child.address_count(), 2); drop(address2); assert_eq!(child.address_count(), 1); child.abort(); } #[tokio::test] async fn inbox_counts() { let (pool, _address) = spawn_many(0..4, |_, mut inbox: Inbox<()>| async move { inbox.recv().await.unwrap_err(); }); let mut pool = pool.into_dyn(); assert_eq!(pool.process_count(), 4); pool.halt_some(1); tokio::time::sleep(Duration::from_millis(10)).await; assert_eq!(pool.process_count(), 3); pool.try_spawn_onto(|mut inbox: Inbox<()>| async move { inbox.recv().await.unwrap_err(); }) .unwrap(); assert_eq!(pool.process_count(), 4); pool.halt_some(2); tokio::time::sleep(Duration::from_millis(10)).await; assert_eq!(pool.process_count(), 2); pool.halt(); tokio::time::sleep(Duration::from_millis(10)).await; assert_eq!(pool.process_count(), 0); } #[protocol] enum U32Protocol { U32(u32), } #[tokio::test] async fn pooled_messaging_split() { let (pool, address) = spawn_many_with( Link::default(), Capacity::Bounded(5), 0..3, |_, mut inbox: Inbox| async move { let mut numbers = Vec::new(); loop { match inbox.recv().await { Ok(U32Protocol::U32(msg)) => { tokio::time::sleep(Duration::from_millis(1)).await; numbers.push(msg); } Err(signal) => match signal { RecvError::Halted => break Ok(numbers), RecvError::ClosedAndEmpty => break Err(()), }, } } }, ); for i in 0..30 { address.send(i).await.unwrap(); } tokio::time::sleep(Duration::from_millis(20)).await; address.halt(); let res = pool .map(|e| e.unwrap().unwrap()) .fold(HashSet::new(), |mut acc, vals| async move { assert!(vals.len() >= 9); assert!(vals.len() <= 11); for val in vals { assert!(acc.insert(val)); } acc }) .await; assert_eq!(res.len(), 30) }