#![deny(warnings)] use futures_util::TryStreamExt; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use tracing::info; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; /// This is our service handler. It receives a Request, routes on its /// path, and returns a Future of a Response. async fn echo(req: Request) -> Result, hyper::Error> { match (req.method(), req.uri().path()) { // Serve some instructions at / (&Method::GET, "/") => Ok(Response::new(Body::from( "Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`", ))), // Simply echo the body back to the client. (&Method::POST, "/echo") => Ok(Response::new(req.into_body())), // Convert to uppercase before sending back to client using a stream. (&Method::POST, "/echo/uppercase") => { let chunk_stream = req.into_body().map_ok(|chunk| { chunk .iter() .map(|byte| byte.to_ascii_uppercase()) .collect::>() }); Ok(Response::new(Body::wrap_stream(chunk_stream))) } // Reverse the entire body before sending back to the client. // // Since we don't know the end yet, we can't simply stream // the chunks as they arrive as we did with the above uppercase endpoint. // So here we do `.await` on the future, waiting on concatenating the full body, // then afterwards the content can be reversed. Only then can we return a `Response`. (&Method::POST, "/echo/reversed") => { let whole_body = hyper::body::to_bytes(req.into_body()).await?; let reversed_body = whole_body.iter().rev().cloned().collect::>(); Ok(Response::new(Body::from(reversed_body))) } // Return the 404 Not Found for other routes. _ => { let mut not_found = Response::default(); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) } } } #[tokio::main] async fn main() -> Result<(), Box> { // Set up `tracing-subscriber` to process tracing data. // Create a jaeger exporter pipeline for a `trace_demo` service. let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 6831)); // Build a Jaeger batch span processor let jaeger_processor = opentelemetry::sdk::trace::BatchSpanProcessor::builder( opentelemetry_jaeger::new_pipeline() .with_service_name("mre-jaeger") .with_agent_endpoint(addr) .with_trace_config(opentelemetry::sdk::trace::config().with_resource( opentelemetry::sdk::Resource::new(vec![ opentelemetry::KeyValue::new("service.name", "echo"), opentelemetry::KeyValue::new("service.namespace", "echo-namespace"), ]), )) .init_async_exporter(opentelemetry::runtime::Tokio) .expect("Jaeger Tokio async exporter"), opentelemetry::runtime::Tokio, ) .build(); // Setup Tracer Provider let provider = opentelemetry::sdk::trace::TracerProvider::builder() .with_span_processor(jaeger_processor) .build(); // Get new Tracer from TracerProvider let tracer = opentelemetry::trace::TracerProvider::tracer(&provider, "my_app"); // Create a layer with the configured tracer let telemetry = tracing_opentelemetry::OpenTelemetryLayer::new(tracer); // Use tracing subscriber `Registry`, or any other subscriber that `impl LookupSpan` tracing_subscriber::registry() .with(telemetry) .try_init() .expect("Default subscriber"); // Create a span and enter it, returning a guard.... let root_span = tracing::span!(tracing::Level::INFO, "root_span_echo").entered(); // Generate a `tracing` "event". info!(status = true, answer = 42, message = "first event"); let addr = ([127, 0, 0, 1], 3000).into(); let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(echo)) }); let server = Server::bind(&addr).serve(service); println!("Listening on http://{}", addr); server.await.expect("Server fault"); let _root_span = root_span.exit(); Ok(()) }