use std::future::Future; use std::sync::Arc; use std::task::Poll; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::PollSemaphore; type SemRet = Option; fn semaphore_poll( sem: &mut PollSemaphore, ) -> tokio_test::task::Spawn + '_> { let fut = futures::future::poll_fn(move |cx| sem.poll_acquire(cx)); tokio_test::task::spawn(fut) } fn semaphore_poll_many( sem: &mut PollSemaphore, permits: u32, ) -> tokio_test::task::Spawn + '_> { let fut = futures::future::poll_fn(move |cx| sem.poll_acquire_many(cx, permits)); tokio_test::task::spawn(fut) } #[tokio::test] async fn it_works() { let sem = Arc::new(Semaphore::new(1)); let mut poll_sem = PollSemaphore::new(sem.clone()); let permit = sem.acquire().await.unwrap(); let mut poll = semaphore_poll(&mut poll_sem); assert!(poll.poll().is_pending()); drop(permit); assert!(matches!(poll.poll(), Poll::Ready(Some(_)))); drop(poll); sem.close(); assert!(semaphore_poll(&mut poll_sem).await.is_none()); // Check that it is fused. assert!(semaphore_poll(&mut poll_sem).await.is_none()); assert!(semaphore_poll(&mut poll_sem).await.is_none()); } #[tokio::test] async fn can_acquire_many_permits() { let sem = Arc::new(Semaphore::new(4)); let mut poll_sem = PollSemaphore::new(sem.clone()); let permit1 = semaphore_poll(&mut poll_sem).poll(); assert!(matches!(permit1, Poll::Ready(Some(_)))); let permit2 = semaphore_poll_many(&mut poll_sem, 2).poll(); assert!(matches!(permit2, Poll::Ready(Some(_)))); assert_eq!(sem.available_permits(), 1); drop(permit2); let mut permit4 = semaphore_poll_many(&mut poll_sem, 4); assert!(permit4.poll().is_pending()); drop(permit1); let permit4 = permit4.poll(); assert!(matches!(permit4, Poll::Ready(Some(_)))); assert_eq!(sem.available_permits(), 0); } #[tokio::test] async fn can_poll_different_amounts_of_permits() { let sem = Arc::new(Semaphore::new(4)); let mut poll_sem = PollSemaphore::new(sem.clone()); assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending()); assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_ready()); let permit = sem.acquire_many(4).await.unwrap(); assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending()); assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_pending()); drop(permit); assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending()); assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_ready()); }