//! Exercises load balancers with mocked services. use futures_core::{Stream, TryStream}; use futures_util::{stream, stream::StreamExt, stream::TryStreamExt}; use hdrhistogram::Histogram; use pin_project_lite::pin_project; use rand::{self, Rng}; use std::hash::Hash; use std::time::Duration; use std::{ pin::Pin, task::{Context, Poll}, }; use tokio::time::{self, Instant}; use tower::balance as lb; use tower::discover::{Change, Discover}; use tower::limit::concurrency::ConcurrencyLimit; use tower::load; use tower::util::ServiceExt; use tower_service::Service; const REQUESTS: usize = 100_000; const CONCURRENCY: usize = 500; const DEFAULT_RTT: Duration = Duration::from_millis(30); static ENDPOINT_CAPACITY: usize = CONCURRENCY; static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [ Duration::from_millis(1), Duration::from_millis(5), Duration::from_millis(10), Duration::from_millis(10), Duration::from_millis(10), Duration::from_millis(100), Duration::from_millis(100), Duration::from_millis(100), Duration::from_millis(500), Duration::from_millis(1000), ]; struct Summary { latencies: Histogram, start: Instant, count_by_instance: [usize; 10], } #[tokio::main] async fn main() { tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::default()).unwrap(); println!("REQUESTS={}", REQUESTS); println!("CONCURRENCY={}", CONCURRENCY); println!("ENDPOINT_CAPACITY={}", ENDPOINT_CAPACITY); print!("MAX_ENDPOINT_LATENCIES=["); for max in &MAX_ENDPOINT_LATENCIES { let l = max.as_secs() * 1_000 + u64::from(max.subsec_millis()); print!("{}ms, ", l); } println!("]"); let decay = Duration::from_secs(10); let d = gen_disco(); let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new( d, DEFAULT_RTT, decay, load::CompleteOnResponse::default(), )); run("P2C+PeakEWMA...", pe).await; let d = gen_disco(); let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new( d, load::CompleteOnResponse::default(), )); run("P2C+LeastLoaded...", ll).await; } type Error = Box; type Key = usize; pin_project! { struct Disco { services: Vec<(Key, S)> } } impl Disco { fn new(services: Vec<(Key, S)>) -> Self { Self { services } } } impl Stream for Disco where S: Service, { type Item = Result, Error>; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { match self.project().services.pop() { Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))), None => { // there may be more later Poll::Pending } } } } fn gen_disco() -> impl Discover< Key = Key, Error = Error, Service = ConcurrencyLimit< impl Service + Send, >, > + Send { Disco::new( MAX_ENDPOINT_LATENCIES .iter() .enumerate() .map(|(instance, latency)| { let svc = tower::service_fn(move |_| { let start = Instant::now(); let maxms = u64::from(latency.subsec_millis()) .saturating_add(latency.as_secs().saturating_mul(1_000)); let latency = Duration::from_millis(rand::thread_rng().gen_range(0..maxms)); async move { time::sleep_until(start + latency).await; let latency = start.elapsed(); Ok(Rsp { latency, instance }) } }); (instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY)) }) .collect(), ) } async fn run(name: &'static str, lb: lb::p2c::Balance) where D: Discover + Unpin + Send + 'static, D::Error: Into, D::Key: Clone + Send + Hash, D::Service: Service + load::Load + Send, >::Error: Into, >::Future: Send, ::Metric: std::fmt::Debug, { println!("{}", name); let requests = stream::repeat(Req).take(REQUESTS); let service = ConcurrencyLimit::new(lb, CONCURRENCY); let responses = service.call_all(requests).unordered(); compute_histo(responses).await.unwrap().report(); } async fn compute_histo(mut times: S) -> Result where S: TryStream + 'static + Unpin, { let mut summary = Summary::new(); while let Some(rsp) = times.try_next().await? { summary.count(rsp); } Ok(summary) } impl Summary { fn new() -> Self { Self { // The max delay is 2000ms. At 3 significant figures. latencies: Histogram::::new_with_max(3_000, 3).unwrap(), start: Instant::now(), count_by_instance: [0; 10], } } fn count(&mut self, rsp: Rsp) { let ms = rsp.latency.as_secs() * 1_000; let ms = ms + u64::from(rsp.latency.subsec_nanos()) / 1_000 / 1_000; self.latencies += ms; self.count_by_instance[rsp.instance] += 1; } fn report(&self) { let mut total = 0; for c in &self.count_by_instance { total += c; } for (i, c) in self.count_by_instance.iter().enumerate() { let p = *c as f64 / total as f64 * 100.0; println!(" [{:02}] {:>5.01}%", i, p); } println!(" wall {:4}s", self.start.elapsed().as_secs()); if self.latencies.len() < 2 { return; } println!(" p50 {:4}ms", self.latencies.value_at_quantile(0.5)); if self.latencies.len() < 10 { return; } println!(" p90 {:4}ms", self.latencies.value_at_quantile(0.9)); if self.latencies.len() < 50 { return; } println!(" p95 {:4}ms", self.latencies.value_at_quantile(0.95)); if self.latencies.len() < 100 { return; } println!(" p99 {:4}ms", self.latencies.value_at_quantile(0.99)); if self.latencies.len() < 1000 { return; } println!(" p999 {:4}ms", self.latencies.value_at_quantile(0.999)); } } #[derive(Debug, Clone)] struct Req; #[derive(Debug)] struct Rsp { latency: Duration, instance: usize, }