#![cfg(feature = "spawn-ready")] #[path = "../support.rs"] mod support; use tokio::time; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; use tower::spawn_ready::{SpawnReady, SpawnReadyLayer}; use tower::util::ServiceExt; use tower_test::mock; #[tokio::test(flavor = "current_thread")] async fn when_inner_is_not_ready() { time::pause(); let _t = support::trace_init(); let layer = SpawnReadyLayer::new(); let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer); // Make the service NotReady handle.allow(0); assert_pending!(service.poll_ready()); // Make the service is Ready handle.allow(1); time::sleep(time::Duration::from_millis(100)).await; assert_ready_ok!(service.poll_ready()); } #[tokio::test(flavor = "current_thread")] async fn when_inner_fails() { let _t = support::trace_init(); let layer = SpawnReadyLayer::new(); let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer); // Make the service NotReady handle.allow(0); handle.send_error("foobar"); assert_eq!( assert_ready_err!(service.poll_ready()).to_string(), "foobar" ); } #[tokio::test(flavor = "current_thread")] async fn propagates_trace_spans() { use tracing::Instrument; let _t = support::trace_init(); let span = tracing::info_span!("my_span"); let service = support::AssertSpanSvc::new(span.clone()); let service = SpawnReady::new(service); let result = tokio::spawn(service.oneshot(()).instrument(span)); result.await.expect("service panicked").expect("failed"); } #[cfg(test)] #[tokio::test(flavor = "current_thread")] async fn abort_on_drop() { let (mock, mut handle) = mock::pair::<(), ()>(); let mut svc = SpawnReady::new(mock); handle.allow(0); // Drive the service to readiness until we signal a drop. let (drop_tx, drop_rx) = tokio::sync::oneshot::channel(); let mut task = tokio_test::task::spawn(async move { tokio::select! { _ = drop_rx => {} _ = svc.ready() => unreachable!("Service must not become ready"), } }); assert_pending!(task.poll()); assert_pending!(handle.poll_request()); // End the task and ensure that the inner service has been dropped. assert!(drop_tx.send(()).is_ok()); tokio_test::assert_ready!(task.poll()); tokio::task::yield_now().await; assert!(tokio_test::assert_ready!(handle.poll_request()).is_none()); }