| Crates.io | rialo-telemetry |
| lib.rs | rialo-telemetry |
| version | 0.1.10 |
| created_at | 2025-11-14 17:07:53.532431+00 |
| updated_at | 2025-12-09 18:32:55.229051+00 |
| description | OpenTelemetry distributed tracing support for Rialo |
| homepage | |
| repository | https://github.com/subzerolabs/rialo |
| max_upload_size | |
| id | 1933168 |
| size | 184,251 |
A comprehensive telemetry library for distributed tracing and metrics in Rialo applications. This crate provides a unified interface for setting up OpenTelemetry distributed tracing, Prometheus metrics, and console logging with minimal configuration.
distributed-tracing - Enables OpenTelemetry distributed tracing supportprometheus - Enables Prometheus metrics collectionreqwest-headers - Enables HTTP client trace context injection utilities for reqwestaxum-headers - Enables HTTP server trace context extraction utilities for axumenv-context - Enables environment variable-based trace context propagation for subprocess communicationuse rialo_telemetry::{TelemetryConfig, init_telemetry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize with console logging only
let config = TelemetryConfig::new();
let handle = init_telemetry(config).await?;
// Your application code here
tracing::info!("Application started");
// Shutdown telemetry when done
handle.shutdown()?;
Ok(())
}
Enable the distributed-tracing feature and set environment variables:
export OTEL_SERVICE_NAME="my-service"
export OTEL_SERVICE_VERSION="1.0.0"
export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318"
export OTEL_EXPORTER_OTLP_HEADERS="x-api-key=your-key"
use rialo_telemetry::{TelemetryConfig, init_telemetry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize with OTLP support (reads from environment)
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Your traced application code here
tracing::info!("Application started with distributed tracing");
handle.shutdown()?;
Ok(())
}
use rialo_telemetry::{TelemetryConfig, OtlpConfig, init_telemetry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let otlp_config = OtlpConfig::new()
.with_service_name("my-service")
.with_service_version("1.0.0")
.with_exporter_endpoint("https://api.honeycomb.io/v1/traces")
.with_console_enabled(true);
let config = TelemetryConfig::new()
.with_otlp_config(otlp_config)
.with_log_level("debug");
let handle = init_telemetry(config).await?;
// Your application code
handle.shutdown()?;
Ok(())
}
The crate includes full configuration support for OpenTelemetry metrics export, though the actual metrics implementation is not yet active. All environment variables and configuration options are parsed and stored for future use:
use rialo_telemetry::{TelemetryConfig, OtlpConfig, init_telemetry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let otlp_config = OtlpConfig::new()
.with_service_name("my-service")
// Traces endpoint
.with_traces_endpoint("http://jaeger:14268/api/traces")
// Metrics configuration (ready for future implementation)
.with_exporter_endpoint("http://otel-collector:4318"); // Base endpoint for metrics
let config = TelemetryConfig::new()
.with_otlp_config(otlp_config);
let handle = init_telemetry(config).await?;
// When metrics are implemented, they will automatically use the configured endpoints
handle.shutdown()?;
Ok(())
}
Note: While metrics configuration is fully supported, the actual metrics export implementation is planned for a future release. Currently, only tracing is actively exported via OTLP.
Enable the prometheus feature:
use rialo_telemetry::{TelemetryConfig, PrometheusConfig, init_telemetry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let registry = prometheus::Registry::new();
let prometheus_config = PrometheusConfig::new(registry.clone())
.with_span_latency_buckets(20)
.with_span_latency_enabled(true);
let config = TelemetryConfig::new()
.with_prometheus_config(prometheus_config);
let handle = init_telemetry(config).await?;
// Your application code - span latencies will be recorded
handle.shutdown()?;
Ok(())
}
The crate provides utilities for propagating trace context across HTTP requests, enabling distributed tracing across microservices.
Enable the reqwest-headers feature to inject trace context into outgoing HTTP requests:
use rialo_telemetry::{inject_trace_headers, apply_trace_headers_to_reqwest};
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry with OpenTelemetry
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
let client = Client::new();
// In a traced context
let span = tracing::info_span!("http_request", service = "api-call");
let _guard = span.enter();
// Inject trace headers into request
let trace_headers = inject_trace_headers();
let request = client.post("https://api.example.com/data");
let request = apply_trace_headers_to_reqwest(request, trace_headers);
let response = request.send().await?;
handle.shutdown()?;
Ok(())
}
Enable the axum-headers feature to extract trace context from incoming HTTP requests:
use rialo_telemetry::extract_and_set_trace_context_axum;
use axum::{extract::State, http::HeaderMap, Json, response::Json as ResponseJson};
#[tracing::instrument]
async fn handler(
headers: HeaderMap,
Json(payload): Json<serde_json::Value>
) -> ResponseJson<serde_json::Value> {
// Extract and set trace context from incoming headers
extract_and_set_trace_context_axum(&headers);
// Your handler logic - this span will now be part of the distributed trace
tracing::info!("Processing request with distributed trace context");
ResponseJson(serde_json::json!({"status": "ok"}))
}
Combining both client and server utilities for full distributed tracing:
// Service A (HTTP client)
use rialo_telemetry::{TelemetryConfig, init_telemetry, inject_trace_headers, apply_trace_headers_to_reqwest};
async fn call_service_b() -> Result<(), Box<dyn std::error::Error>> {
let span = tracing::info_span!("call_service_b");
let _guard = span.enter();
let client = reqwest::Client::new();
let trace_headers = inject_trace_headers();
let request = client.post("http://service-b:8080/api/process");
let request = apply_trace_headers_to_reqwest(request, trace_headers);
let response = request.send().await?;
tracing::info!("Received response from service B");
Ok(())
}
// Service B (HTTP server)
use rialo_telemetry::extract_and_set_trace_context_axum;
use axum::{http::HeaderMap, Json};
#[tracing::instrument]
async fn process_request(
headers: HeaderMap,
Json(data): Json<serde_json::Value>
) -> Json<serde_json::Value> {
// Extract trace context - this creates a child span of service A's span
extract_and_set_trace_context_axum(&headers);
tracing::info!("Processing request in service B");
// This span is now part of the same distributed trace as service A
let result = process_business_logic(data).await;
Json(result)
}
Note: Both utilities require the distributed-tracing feature to be enabled along with their respective feature flags (reqwest-headers or axum-headers).
Enable the env-context feature to propagate trace context across process boundaries using environment variables.
When the env-context feature is enabled, you can manually extract trace context from environment variables after initializing telemetry. This allows subprocesses to connect to their parent's trace:
use rialo_telemetry::{TelemetryConfig, init_telemetry, extract_and_set_trace_context_env};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry first
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Then extract trace context from environment variables (if any)
extract_and_set_trace_context_env();
// This span is now part of parent's distributed trace!
tracing::info!("Child process started");
handle.shutdown()?;
Ok(())
}
Use inject_trace_env_to_cmd() for convenient one-liner subprocess trace propagation:
use rialo_telemetry::{TelemetryConfig, init_telemetry, inject_trace_env_to_cmd};
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry first
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Then extract trace context from environment variables (if any)
extract_and_set_trace_context_env();
// Parent process: ergonomic one-liner to inject trace context
let span = tracing::info_span!("subprocess_execution", command = "worker");
let _guard = span.enter();
// One line replaces the manual env injection loop!
let cmd = inject_trace_env_to_cmd(Command::new("./worker"));
let output = cmd.arg("--task=process").output()?;
tracing::info!("Subprocess completed with status: {}", output.status);
handle.shutdown()?;
Ok(())
}
For fine-grained control, you can still use the manual functions:
use rialo_telemetry::{inject_trace_env, extract_and_set_trace_context_env};
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry first
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Manually extract trace context at a specific point
extract_and_set_trace_context_env();
// Manual injection (equivalent to inject_trace_env_to_cmd)
let trace_env = inject_trace_env();
let mut cmd = Command::new("./child-process");
for (key, value) in trace_env {
cmd.env(key, value);
}
let output = cmd.output()?;
handle.shutdown()?;
Ok(())
}
In the child process, initialize telemetry and extract trace context:
use rialo_telemetry::{TelemetryConfig, init_telemetry, extract_and_set_trace_context_env};
#[tracing::instrument]
fn main() -> Result<(), Box<dyn std::error::Error>> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
// Initialize telemetry first
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Then extract trace context from environment variables
extract_and_set_trace_context_env();
// This span is now part of the parent process's distributed trace
tracing::info!("Child process started with inherited trace context");
do_work().await;
tracing::info!("Child process completed");
handle.shutdown()
})
}
async fn do_work() {
let span = tracing::info_span!("child_work");
let _guard = span.enter();
tracing::info!("Performing work in child process");
// This work is automatically traced as part of the parent's trace
}
You can also extract from a custom environment map instead of the current process environment:
use rialo_telemetry::extract_and_set_trace_context_from_env_map;
use std::collections::HashMap;
fn handle_custom_environment(custom_env: &HashMap<String, String>) {
// Extract trace context from a specific environment map
extract_and_set_trace_context_from_env_map(custom_env);
// Current span now has the extracted trace context as parent
tracing::info!("Working with custom trace context");
}
Complete example showing trace propagation from parent to child process:
// parent.rs - The main process
use rialo_telemetry::{TelemetryConfig, init_telemetry, inject_trace_env_to_cmd};
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Start a distributed trace
let span = tracing::info_span!("batch_job", job_id = "12345");
let _guard = span.enter();
tracing::info!("Starting batch job with multiple workers");
// Launch multiple child processes with ergonomic one-liners
for worker_id in 1..=3 {
let worker_span = tracing::info_span!("launch_worker", worker_id = worker_id);
let _worker_guard = worker_span.enter();
// Ergonomic one-liner - inject trace context and spawn
let cmd = inject_trace_env_to_cmd(Command::new("./worker"))
.arg(worker_id.to_string());
tracing::info!("Launching worker {}", worker_id);
cmd.spawn()?;
}
tracing::info!("All workers launched");
handle.shutdown()?;
Ok(())
}
// worker.rs - Child process
use rialo_telemetry::{TelemetryConfig, init_telemetry, extract_and_set_trace_context_env};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let worker_id = std::env::args().nth(1).unwrap_or("unknown".to_string());
// Initialize telemetry first
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Then extract trace context from environment variables
extract_and_set_trace_context_env();
// Create main span for this worker - now inherits from parent
let worker_span = tracing::info_span!("worker_process", worker_id = worker_id);
let _guard = worker_span.enter();
tracing::info!("Worker {} started with inherited trace", worker_id);
// Do work - all automatically part of the original batch job trace
process_batch_items().await;
tracing::info!("Worker {} completed", worker_id);
handle.shutdown()?;
Ok(())
}
async fn process_batch_items() {
for item in 1..=10 {
let item_span = tracing::info_span!("process_item", item_id = item);
let _guard = item_span.enter();
tracing::info!("Processing item {}", item);
// Simulate work
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
Note: Environment variable context propagation requires the distributed-tracing feature to be enabled along with the env-context feature flag.
Baggage provides a way to propagate key-value metadata across distributed systems alongside trace context. It's useful for passing cross-cutting concerns like user IDs, feature flags, request priorities, or any other data that should be available throughout a distributed trace.
The crate provides comprehensive baggage manipulation utilities when the distributed-tracing feature is enabled:
use rialo_telemetry::{set_baggage, get_baggage, get_all_baggage, remove_baggage, clear_baggage};
use rialo_telemetry::{TelemetryConfig, init_telemetry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry with OpenTelemetry (baggage requires this)
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Set baggage items (will propagate to child spans and across distributed calls)
set_baggage("user_id", "12345");
set_baggage("session_id", "session-abc-def");
set_baggage("feature_flag", "new_ui_enabled");
// Get specific baggage items
let user_id = get_baggage("user_id"); // Some("12345")
let request_id = get_baggage("request_id"); // None
// Get all baggage as HashMap
let all_baggage = get_all_baggage();
println!("Current baggage: {:?}", all_baggage);
// Remove specific items
remove_baggage("session_id");
// Clear all baggage
clear_baggage();
handle.shutdown()?;
Ok(())
}
Baggage automatically propagates across distributed systems through the same mechanisms as trace context:
use rialo_telemetry::{set_baggage, get_baggage, inject_trace_headers, apply_trace_headers_to_reqwest};
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Service A: Set baggage that should propagate
set_baggage("user_id", "user-12345");
set_baggage("tenant_id", "tenant-abc");
set_baggage("request_priority", "high");
let span = tracing::info_span!("call_service_b");
let _guard = span.enter();
// HTTP call - baggage automatically included in trace context headers
let client = Client::new();
let trace_headers = inject_trace_headers(); // Includes baggage
let request = client.post("http://service-b:8080/process");
let request = apply_trace_headers_to_reqwest(request, trace_headers);
let response = request.send().await?;
handle.shutdown()?;
Ok(())
}
// Service B: Automatically receives baggage
use rialo_telemetry::{extract_and_set_trace_context_axum, get_baggage};
use axum::{http::HeaderMap, Json};
#[tracing::instrument]
async fn process_request(
headers: HeaderMap,
Json(data): Json<serde_json::Value>
) -> Json<serde_json::Value> {
// Extract trace context (includes baggage)
extract_and_set_trace_context_axum(&headers);
// Baggage is now available in service B
let user_id = get_baggage("user_id"); // Some("user-12345")
let tenant_id = get_baggage("tenant_id"); // Some("tenant-abc")
let priority = get_baggage("request_priority"); // Some("high")
tracing::info!("Processing request for user {:?} in tenant {:?} with priority {:?}",
user_id, tenant_id, priority);
// Business logic can use baggage data
if priority == Some("high".to_string()) {
process_with_high_priority(data).await
} else {
process_normally(data).await
}
}
Baggage also propagates across process boundaries when using environment variable context propagation:
use rialo_telemetry::{set_baggage, inject_trace_env_to_cmd};
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Parent process: Set baggage
set_baggage("batch_id", "batch-2024-001");
set_baggage("processing_mode", "parallel");
let span = tracing::info_span!("launch_worker");
let _guard = span.enter();
// Launch subprocess with baggage propagation
let cmd = inject_trace_env_to_cmd(Command::new("./worker"));
let output = cmd.arg("--task=process").output()?;
handle.shutdown()?;
Ok(())
}
// Worker process: Inherits baggage
use rialo_telemetry::{TelemetryConfig, init_telemetry, extract_and_set_trace_context_env, get_baggage};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry first
let config = TelemetryConfig::new().with_otlp();
let handle = init_telemetry(config).await?;
// Extract trace context and baggage from environment variables
extract_and_set_trace_context_env();
// Baggage from parent is now available
let batch_id = get_baggage("batch_id"); // Some("batch-2024-001")
let mode = get_baggage("processing_mode"); // Some("parallel")
tracing::info!("Worker started for batch {:?} in mode {:?}", batch_id, mode);
// Use baggage to influence processing
if mode == Some("parallel".to_string()) {
process_in_parallel().await;
} else {
process_sequentially().await;
}
handle.shutdown()?;
Ok(())
}
Use Cases:
Performance Considerations:
remove_baggage()clear_baggage() when starting fresh contextsSecurity Notes:
Baggage propagation is enabled by default when OpenTelemetry is configured. The propagators include both trace context and baggage:
// Baggage is included in default propagators
// OTEL_PROPAGATORS="tracecontext,baggage" (default)
// To customize propagators (not recommended unless you have specific needs):
std::env::set_var("OTEL_PROPAGATORS", "tracecontext,baggage,b3");
Note: Baggage utilities require the distributed-tracing feature to be enabled.
The crate supports all standard OpenTelemetry environment variables:
OTEL_SERVICE_NAME - Service name (default: "rialo")OTEL_SERVICE_VERSION - Service version (default: "unknown")OTEL_RESOURCE_ATTRIBUTES - Additional resource attributesOTEL_EXPORTER_OTLP_ENDPOINT - General OTLP endpoint (default: "http://localhost:4318")OTEL_EXPORTER_OTLP_TRACES_ENDPOINT - Traces-specific endpoint (overrides general)OTEL_EXPORTER_OTLP_METRICS_ENDPOINT - Metrics-specific endpoint (overrides general)OTEL_EXPORTER_OTLP_INSECURE - Use insecure connection for general endpoint (default: false)OTEL_EXPORTER_OTLP_TRACES_INSECURE - Use insecure connection for traces (default: false)OTEL_EXPORTER_OTLP_METRICS_INSECURE - Use insecure connection for metrics (default: false)OTEL_EXPORTER_OTLP_HEADERS - General headers for authenticationOTEL_EXPORTER_OTLP_TRACES_HEADERS - Traces-specific headers (merged with general)OTEL_EXPORTER_OTLP_METRICS_HEADERS - Metrics-specific headers (merged with general)OTEL_EXPORTER_OTLP_PROTOCOL - General export protocol: "grpc", "http/protobuf", "http/json" (default: "http/protobuf")OTEL_EXPORTER_OTLP_TRACES_PROTOCOL - Traces-specific protocol (overrides general)OTEL_EXPORTER_OTLP_METRICS_PROTOCOL - Metrics-specific protocol (overrides general)OTEL_TRACES_ENABLED - Enable/disable traces (default: true)OTEL_METRICS_ENABLED - Enable/disable metrics (default: true)OTEL_LOG_LEVEL - Log level (default: "info")OTEL_EXPORTER_OTLP_METRICS_PERIOD - Metrics reporting interval (default: "30s")OTEL_PROPAGATORS - Trace context propagators, comma-separated (default: "tracecontext,baggage")For local development and testing, you can easily connect to a local Jaeger instance to visualize your traces.
Jaeger can be run either as an all-in-one Docker container or built locally. A Nix recipe is included for convenience in the rialo-nix-toolchain.
Run the all-in-one Jaeger container:
docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 9411:9411 \
jaegertracing/all-in-one:1.6.0
This exposes the following ports:
If you're using the rialo-nix-toolchain, you can run Jaeger with:
nix run .#jaeger
Once Jaeger is running, configure your application to send traces to it using environment variables:
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
Or create a .env file:
# .env
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
Configuration values are resolved in this order (highest to lowest precedence):
For endpoints and headers, signal-specific settings override general settings:
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT overrides OTEL_EXPORTER_OTLP_ENDPOINT for tracesOTEL_EXPORTER_OTLP_METRICS_ENDPOINT overrides OTEL_EXPORTER_OTLP_ENDPOINT for metricsexport OTEL_SERVICE_NAME="my-service"
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="https://api.honeycomb.io/v1/traces"
export OTEL_EXPORTER_OTLP_HEADERS="x-honeycomb-team=your-api-key,x-honeycomb-dataset=my-dataset"
# If using metrics (when implemented):
# export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT="https://api.honeycomb.io/v1/metrics"
export OTEL_SERVICE_NAME="my-service"
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="http://localhost:14268/api/traces"
export OTEL_EXPORTER_OTLP_PROTOCOL="http/protobuf"
# Jaeger doesn't support metrics via OTLP, so disable them:
# export OTEL_METRICS_ENABLED="false"
export OTEL_SERVICE_NAME="my-service"
# Send traces to Jaeger
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT="http://jaeger:14268/api/traces"
# Send metrics to Prometheus-compatible endpoint (when implemented)
export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT="http://prometheus:9090/api/v1/otlp/v1/metrics"
# Different authentication for each
export OTEL_EXPORTER_OTLP_TRACES_HEADERS="authorization=Bearer traces-token"
export OTEL_EXPORTER_OTLP_METRICS_HEADERS="authorization=Bearer metrics-token"
# Console logging only for development
export RUST_LOG="debug"
# No OTEL_EXPORTER_OTLP_ENDPOINT set - will use console only
The library handles common error scenarios gracefully:
The crate includes comprehensive tests for various configurations:
# Test with no features
cargo test -p rialo-telemetry
# Test with OpenTelemetry
cargo test -p rialo-telemetry --features distributed-tracing
# Test with all features
cargo test -p rialo-telemetry --features "distributed-tracing,prometheus,reqwest-headers,axum-headers,env-context"
# Test trace context utilities
cargo test -p rialo-telemetry --features "reqwest-headers" test_inject_trace_headers
# Test environment variable context utilities
cargo test -p rialo-telemetry --features "env-context" test_inject_trace_env
# Test the ergonomic Command helper
cargo test -p rialo-telemetry --features "env-context" test_inject_trace_env_to_cmd
# Test baggage utilities
cargo test -p rialo-telemetry --features "distributed-tracing" test_baggage
Licensed under the Apache License, Version 2.0.
This crate is part of the Rialo project. See the main repository for contribution guidelines.