use hyper::http::{Request, Response}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Server}; use opentelemetry::trace::Tracer; use opentelemetry::{ global, trace::{Span, Status}, }; use opentelemetry_sdk::runtime::Tokio; use opentelemetry_sdk::trace::TracerProvider; use opentelemetry_zpages::{tracez, TracezError, TracezQuerier, TracezResponse}; use rand::Rng; use std::str::FromStr; use std::sync::Arc; use std::{convert::Infallible, net::SocketAddr}; use tokio::time::Duration; async fn handler( req: Request, querier: Arc, ) -> Result, Infallible> { Ok::<_, Infallible>(match req.uri().path() { uri if uri.starts_with("/tracez/api") => { // if it is api call let parts = uri .split('/') .filter(|x| !x.is_empty()) .collect::>(); if parts.len() < 3 { Response::builder().status(404).body(Body::empty()).unwrap() } else { let operation_name = *(parts.get(2).unwrap_or(&"")); match operation_name { "aggregations" => tracez_response_or_server_error(querier.aggregation().await), "running" => { if let Some(&span_name) = parts.get(3) { tracez_response_or_server_error(querier.running(span_name.into()).await) } else { Response::builder().status(404).body(Body::empty()).unwrap() } } "error" => { if let Some(&span_name) = parts.get(3) { tracez_response_or_server_error(querier.error(span_name.into()).await) } else { Response::builder().status(404).body(Body::empty()).unwrap() } } "latency" => { let bucket_index = parts.get(3); let span_name = parts.get(4); match (bucket_index, span_name) { (Some(&bucket_index), Some(&span_name)) => { if let Ok(bucket_index) = u32::from_str(bucket_index) { tracez_response_or_server_error( querier .latency(bucket_index as usize, span_name.into()) .await, ) } else { Response::builder().status(404).body(Body::empty()).unwrap() } } (_, _) => Response::builder().status(404).body(Body::empty()).unwrap(), } } _ => Response::builder().status(404).body(Body::empty()).unwrap(), } } } "/running" => { let span_duration = Duration::from_millis(rand::thread_rng().gen_range(1..6000)); let mut spans = global::tracer("zpages-test").start("running-spans"); spans.set_status(Status::Ok); tokio::time::sleep(span_duration).await; println!("The span slept for {} ms", span_duration.as_millis()); Response::new(Body::empty()) } _ => Response::builder().status(404).body(Body::empty()).unwrap(), }) } fn tracez_response_or_server_error(resp: Result) -> Response { match resp { Ok(resp) => Response::new(Body::from(serde_json::to_string(&resp).unwrap())), Err(_) => Response::builder().status(500).body(Body::empty()).unwrap(), } } #[tokio::main] async fn main() { let (processor, querier) = tracez(5, Tokio); let provider = TracerProvider::builder() .with_span_processor(processor) .build(); global::set_tracer_provider(provider); let querier = Arc::new(querier); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let server = Server::bind(&addr).serve(make_service_fn(move |_conn| { let inner = Arc::clone(&querier); async move { Ok::<_, Infallible>(service_fn(move |req| handler(req, Arc::clone(&inner)))) } })); println!("Listening on {addr}"); if let Err(e) = server.await { eprintln!("server error: {e}"); } }