#![cfg(feature = "hedge")] #[path = "../support.rs"] mod support; use std::time::Duration; use tokio::time; use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; use tower::hedge::{Hedge, Policy}; use tower_test::{assert_request_eq, mock}; #[tokio::test(flavor = "current_thread")] async fn hedge_orig_completes_first() { let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("orig")); // Check that orig request has been issued. let req = assert_request_eq!(handle, "orig"); // Check fut is not ready. assert_pending!(fut.poll()); // Check hedge has not been issued. assert_pending!(handle.poll_request()); time::advance(Duration::from_millis(11)).await; // Check fut is not ready. assert_pending!(fut.poll()); // Check that the hedge has been issued. let _hedge_req = assert_request_eq!(handle, "orig"); req.send_response("orig-done"); // Check that fut gets orig response. assert_eq!(assert_ready_ok!(fut.poll()), "orig-done"); } #[tokio::test(flavor = "current_thread")] async fn hedge_hedge_completes_first() { let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("orig")); // Check that orig request has been issued. let _req = assert_request_eq!(handle, "orig"); // Check fut is not ready. assert_pending!(fut.poll()); // Check hedge has not been issued. assert_pending!(handle.poll_request()); time::advance(Duration::from_millis(11)).await; // Check fut is not ready. assert_pending!(fut.poll()); // Check that the hedge has been issued. let hedge_req = assert_request_eq!(handle, "orig"); hedge_req.send_response("hedge-done"); // Check that fut gets hedge response. assert_eq!(assert_ready_ok!(fut.poll()), "hedge-done"); } #[tokio::test(flavor = "current_thread")] async fn completes_before_hedge() { let _t = support::trace_init(); let (mut service, mut handle) = new_service(TestPolicy); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("orig")); // Check that orig request has been issued. let req = assert_request_eq!(handle, "orig"); // Check fut is not ready. assert_pending!(fut.poll()); req.send_response("orig-done"); // Check hedge has not been issued. assert_pending!(handle.poll_request()); // Check that fut gets orig response. assert_eq!(assert_ready_ok!(fut.poll()), "orig-done"); } #[tokio::test(flavor = "current_thread")] async fn request_not_retyable() { let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call(NOT_RETRYABLE)); // Check that orig request has been issued. let req = assert_request_eq!(handle, NOT_RETRYABLE); // Check fut is not ready. assert_pending!(fut.poll()); // Check hedge has not been issued. assert_pending!(handle.poll_request()); time::advance(Duration::from_millis(10)).await; // Check fut is not ready. assert_pending!(fut.poll()); // Check hedge has not been issued. assert_pending!(handle.poll_request()); req.send_response("orig-done"); // Check that fut gets orig response. assert_eq!(assert_ready_ok!(fut.poll()), "orig-done"); } #[tokio::test(flavor = "current_thread")] async fn request_not_clonable() { let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call(NOT_CLONABLE)); // Check that orig request has been issued. let req = assert_request_eq!(handle, NOT_CLONABLE); // Check fut is not ready. assert_pending!(fut.poll()); // Check hedge has not been issued. assert_pending!(handle.poll_request()); time::advance(Duration::from_millis(10)).await; // Check fut is not ready. assert_pending!(fut.poll()); // Check hedge has not been issued. assert_pending!(handle.poll_request()); req.send_response("orig-done"); // Check that fut gets orig response. assert_eq!(assert_ready_ok!(fut.poll()), "orig-done"); } type Req = &'static str; type Res = &'static str; type Mock = tower_test::mock::Mock; type Handle = tower_test::mock::Handle; static NOT_RETRYABLE: &str = "NOT_RETRYABLE"; static NOT_CLONABLE: &str = "NOT_CLONABLE"; #[derive(Clone)] struct TestPolicy; impl tower::hedge::Policy for TestPolicy { fn can_retry(&self, req: &Req) -> bool { *req != NOT_RETRYABLE } fn clone_request(&self, req: &Req) -> Option { if *req == NOT_CLONABLE { None } else { Some(req) } } } fn new_service + Clone>(policy: P) -> (mock::Spawn>, Handle) { let (service, handle) = tower_test::mock::pair(); let mock_latencies: [u64; 10] = [1, 1, 1, 1, 1, 1, 1, 1, 10, 10]; let service = Hedge::new_with_mock_latencies( service, policy, 10, 0.9, Duration::from_secs(60), &mock_latencies, ); (mock::Spawn::new(service), handle) }