#![cfg(feature = "balance")] #[path = "../support.rs"] mod support; use std::future::Future; use std::task::{Context, Poll}; use tokio_test::{assert_pending, assert_ready, task}; use tower::balance::p2c::Balance; use tower::discover::Change; use tower_service::Service; use tower_test::mock; type Req = &'static str; struct Mock(mock::Mock); impl Service for Mock { type Response = as Service>::Response; type Error = as Service>::Error; type Future = as Service>::Future; fn poll_ready(&mut self, cx: &mut Context) -> Poll> { self.0.poll_ready(cx) } fn call(&mut self, req: Req) -> Self::Future { self.0.call(req) } } impl tower::load::Load for Mock { type Metric = usize; fn load(&self) -> Self::Metric { rand::random() } } #[test] fn stress() { let _t = support::trace_init(); let mut task = task::spawn(()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx)); let mut nready = 0; let mut services = slab::Slab::<(mock::Handle, bool)>::new(); let mut retired = Vec::>::new(); for _ in 0..100_000 { for _ in 0..(rand::random::() % 8) { if !services.is_empty() && rand::random() { if nready == 0 || rand::random::() > u8::max_value() / 4 { // ready a service // TODO: sometimes ready a removed service? for (_, (handle, ready)) in &mut services { if !*ready { handle.allow(1); *ready = true; nready += 1; break; } } } else { // use a service use std::task::Poll; match task.enter(|cx, _| cache.poll_ready(cx)) { Poll::Ready(Ok(())) => { assert_ne!(nready, 0, "got ready when no service is ready"); let mut fut = cache.call("hello"); let mut fut = std::pin::Pin::new(&mut fut); assert_pending!(task.enter(|cx, _| fut.as_mut().poll(cx))); let mut found = false; for (_, (handle, ready)) in &mut services { if *ready { if let Poll::Ready(Some((req, res))) = handle.poll_request() { assert_eq!(req, "hello"); res.send_response("world"); *ready = false; nready -= 1; found = true; break; } } } if !found { // we must have been given a retired service let mut at = None; for (i, handle) in retired.iter_mut().enumerate() { if let Poll::Ready(Some((req, res))) = handle.poll_request() { assert_eq!(req, "hello"); res.send_response("world"); at = Some(i); break; } } let _ = retired.swap_remove( at.expect("request was not sent to a ready service"), ); nready -= 1; } assert_ready!(task.enter(|cx, _| fut.as_mut().poll(cx))).unwrap(); } Poll::Ready(_) => unreachable!("discover stream has not failed"), Poll::Pending => { // assert_eq!(nready, 0, "got pending when a service is ready"); } } } } else if services.is_empty() || rand::random() { if services.is_empty() || nready == 0 || rand::random() { // add let (svc, mut handle) = mock::pair::(); let svc = Mock(svc); handle.allow(0); let k = services.insert((handle, false)); let ok = tx.send(Ok(Change::Insert(k, svc))); assert!(ok.is_ok()); } else { // remove while !services.is_empty() { let k = rand::random::() % (services.iter().last().unwrap().0 + 1); if services.contains(k) { let (handle, ready) = services.remove(k); if ready { retired.push(handle); } let ok = tx.send(Ok(Change::Remove(k))); assert!(ok.is_ok()); break; } } } } else { // fail a service while !services.is_empty() { let k = rand::random::() % (services.iter().last().unwrap().0 + 1); if services.contains(k) { let (mut handle, ready) = services.remove(k); if ready { nready -= 1; } handle.send_error("doom"); break; } } } } let r = task.enter(|cx, _| cache.poll_ready(cx)); // drop any retired services that the p2c has gotten rid of let mut removed = Vec::new(); for (i, handle) in retired.iter_mut().enumerate() { if let Poll::Ready(None) = handle.poll_request() { removed.push(i); } } for i in removed.into_iter().rev() { retired.swap_remove(i); nready -= 1; } use std::task::Poll; match r { Poll::Ready(Ok(())) => { assert_ne!(nready, 0, "got ready when no service is ready"); } Poll::Ready(_) => unreachable!("discover stream has not failed"), Poll::Pending => { assert_eq!(nready, 0, "got pending when a service is ready"); } } } }