use futures_util::future; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task}; use tower_retry::Policy; use tower_test::{assert_request_eq, mock}; #[tokio::test] async fn retry_errors() { let (mut service, mut handle) = new_service(RetryErrors); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("hello")); assert_request_eq!(handle, "hello").send_error("retry me"); assert_pending!(fut.poll()); assert_request_eq!(handle, "hello").send_response("world"); assert_eq!(fut.into_inner().await.unwrap(), "world"); } #[tokio::test] async fn retry_limit() { let (mut service, mut handle) = new_service(Limit(2)); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("hello")); assert_request_eq!(handle, "hello").send_error("retry 1"); assert_pending!(fut.poll()); assert_request_eq!(handle, "hello").send_error("retry 2"); assert_pending!(fut.poll()); assert_request_eq!(handle, "hello").send_error("retry 3"); assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 3"); } #[tokio::test] async fn retry_error_inspection() { let (mut service, mut handle) = new_service(UnlessErr("reject")); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("hello")); assert_request_eq!(handle, "hello").send_error("retry 1"); assert_pending!(fut.poll()); assert_request_eq!(handle, "hello").send_error("reject"); assert_eq!(assert_ready_err!(fut.poll()).to_string(), "reject"); } #[tokio::test] async fn retry_cannot_clone_request() { let (mut service, mut handle) = new_service(CannotClone); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("hello")); assert_request_eq!(handle, "hello").send_error("retry 1"); assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 1"); } #[tokio::test] async fn success_with_cannot_clone() { // Even though the request couldn't be cloned, if the first request succeeds, // it should succeed overall. let (mut service, mut handle) = new_service(CannotClone); assert_ready_ok!(service.poll_ready()); let mut fut = task::spawn(service.call("hello")); assert_request_eq!(handle, "hello").send_response("world"); assert_ready_ok!(fut.poll(), "world"); } type Req = &'static str; type Res = &'static str; type InnerError = &'static str; type Error = Box; type Mock = mock::Mock; type Handle = mock::Handle; #[derive(Clone)] struct RetryErrors; impl Policy for RetryErrors { type Future = future::Ready; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { if result.is_err() { Some(future::ready(RetryErrors)) } else { None } } fn clone_request(&self, req: &Req) -> Option { Some(*req) } } #[derive(Clone)] struct Limit(usize); impl Policy for Limit { type Future = future::Ready; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { if result.is_err() && self.0 > 0 { Some(future::ready(Limit(self.0 - 1))) } else { None } } fn clone_request(&self, req: &Req) -> Option { Some(*req) } } #[derive(Clone)] struct UnlessErr(InnerError); impl Policy for UnlessErr { type Future = future::Ready; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { result.err().and_then(|err| { if err.to_string() != self.0 { Some(future::ready(self.clone())) } else { None } }) } fn clone_request(&self, req: &Req) -> Option { Some(*req) } } #[derive(Clone)] struct CannotClone; impl Policy for CannotClone { type Future = future::Ready; fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option { unreachable!("retry cannot be called since request isn't cloned"); } fn clone_request(&self, _req: &Req) -> Option { None } } fn new_service + Clone>( policy: P, ) -> (mock::Spawn>, Handle) { let retry = tower_retry::RetryLayer::new(policy); mock::spawn_layer(retry) }