otel-instrumentation-redis

Crates.iootel-instrumentation-redis
lib.rsotel-instrumentation-redis
version0.1.1
created_at2025-09-03 21:00:49.272214+00
updated_at2025-09-03 21:00:49.272214+00
descriptionOpenTelemetry instrumentation for redis-rs
homepage
repositoryhttps://github.com/hermes-capital-io/hermes-platform
max_upload_size
id1823235
size142,909
Richard (Executioner1939)

documentation

README

OpenTelemetry Instrumentation for Redis

crates.io Documentation License Build Status Minimum Supported Rust Version

Production-ready OpenTelemetry instrumentation for the redis-rs crate, providing distributed tracing and observability for Redis operations with minimal performance overhead.

Features

  • 🚀 Zero-Config Instrumentation: Drop-in replacement for redis-rs clients with automatic tracing
  • 🔄 Dual Mode Support: Both synchronous and asynchronous Redis operations
  • 📊 OpenTelemetry Native: Full trace context propagation and semantic conventions compliance
  • âš¡ Performance Optimized: Minimal overhead with lazy attribute evaluation
  • 🔌 Connection Pooling: Built-in support for r2d2 and bb8 connection pools (via examples)
  • 🎯 Selective Tracing: Configurable operation filtering for noise reduction
  • 📈 Metrics Ready: Prepared for future OpenTelemetry metrics support

Table of Contents

Installation

Add this to your Cargo.toml:

[dependencies]
otel-instrumentation-redis = "0.1.0"

# For synchronous Redis operations (default)
redis = "0.32"

# For async operations, also add:
tokio = { version = "1.0", features = ["full"] }

# OpenTelemetry dependencies
opentelemetry = "0.30"
opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] }
opentelemetry-semantic-conventions = "0.30"

Feature Flags

# Default: synchronous support only
otel-instrumentation-redis = "0.1.0"

# Async support with tokio
otel-instrumentation-redis = { version = "0.1.0", features = ["aio"] }

# Both sync and async
otel-instrumentation-redis = { version = "0.1.0", features = ["sync", "aio"] }

Quick Start

use otel_instrumentation_redis::InstrumentedClient;
use redis::{Client, Commands};
use opentelemetry::global;

fn main() -> redis::RedisResult<()> {
    // Initialize OpenTelemetry (see Configuration section for details)
    init_telemetry();
    
    // Create an instrumented Redis client
    let client = Client::open("redis://127.0.0.1:6379")?;
    let instrumented = InstrumentedClient::new(client);
    
    // Get a connection - this is automatically traced
    let mut conn = instrumented.get_connection()?;
    
    // All Redis operations are now traced
    conn.set("user:1:name", "Alice")?;
    let name: String = conn.get("user:1:name")?;
    
    println!("Retrieved name: {}", name);
    
    // Shutdown telemetry
    global::shutdown_tracer_provider();
    Ok(())
}

Detailed Usage

Synchronous Operations

use otel_instrumentation_redis::InstrumentedClient;
use redis::{Client, Commands, Connection};

fn example_sync() -> redis::RedisResult<()> {
    let client = Client::open("redis://127.0.0.1:6379")?;
    let instrumented = InstrumentedClient::new(client);
    let mut conn = instrumented.get_connection()?;
    
    // String operations
    conn.set_ex("session:abc123", "user_data", 3600)?;
    conn.expire("session:abc123", 7200)?;
    let ttl: i64 = conn.ttl("session:abc123")?;
    
    // Hash operations
    conn.hset_multiple("user:1", &[
        ("name", "Alice"),
        ("email", "alice@example.com"),
        ("role", "admin"),
    ])?;
    let user_data: Vec<String> = conn.hvals("user:1")?;
    
    // List operations
    conn.lpush("queue:tasks", "task1")?;
    conn.rpush("queue:tasks", "task2")?;
    let task: Option<String> = conn.lpop("queue:tasks", None)?;
    
    // Set operations
    conn.sadd("online_users", "user:1")?;
    conn.sadd("online_users", "user:2")?;
    let count: i64 = conn.scard("online_users")?;
    
    // Sorted set operations
    conn.zadd("leaderboard", "Alice", 100)?;
    conn.zadd("leaderboard", "Bob", 95)?;
    let top_players: Vec<String> = conn.zrevrange("leaderboard", 0, 9)?;
    
    Ok(())
}

Asynchronous Operations

#[cfg(feature = "aio")]
use otel_instrumentation_redis::InstrumentedClient;
use redis::{Client, AsyncCommands};

#[tokio::main]
async fn example_async() -> redis::RedisResult<()> {
    let client = Client::open("redis://127.0.0.1:6379")?;
    let instrumented = InstrumentedClient::new(client);
    
    // Standard async connection
    let mut conn = instrumented.get_async_connection().await?;
    
    // Async operations with automatic tracing
    conn.set("async_key", "async_value").await?;
    let value: String = conn.get("async_key").await?;
    
    // Multiplexed connection for better performance
    let mut multiplexed = instrumented.get_multiplexed_async_connection().await?;
    
    // Concurrent operations
    let futures = vec![
        Box::pin(multiplexed.clone().set("key1", "value1")),
        Box::pin(multiplexed.clone().set("key2", "value2")),
        Box::pin(multiplexed.clone().set("key3", "value3")),
    ];
    
    futures::future::join_all(futures).await;
    
    // Pub/Sub with tracing
    let mut pubsub = conn.as_pubsub();
    pubsub.subscribe("channel1").await?;
    pubsub.subscribe("channel2").await?;
    
    // Messages are traced as they're received
    let msg = pubsub.get_message().await?;
    println!("Received: {:?}", msg);
    
    Ok(())
}

Connection Pooling

Using r2d2 (Synchronous)

use otel_instrumentation_redis::InstrumentedClient;
use r2d2::Pool;
use redis::Client;

fn setup_connection_pool() -> redis::RedisResult<Pool<InstrumentedClient>> {
    let client = Client::open("redis://127.0.0.1:6379")?;
    let instrumented = InstrumentedClient::new(client);
    
    let pool = r2d2::Pool::builder()
        .max_size(15)
        .min_idle(Some(5))
        .connection_timeout(std::time::Duration::from_secs(2))
        .idle_timeout(Some(std::time::Duration::from_secs(60)))
        .build(instrumented)?;
    
    Ok(pool)
}

fn use_pool(pool: &Pool<InstrumentedClient>) -> redis::RedisResult<()> {
    let mut conn = pool.get()?;
    
    // Connection is automatically returned to pool when dropped
    conn.set("pooled_key", "pooled_value")?;
    let value: String = conn.get("pooled_key")?;
    
    Ok(())
}

Using bb8 (Asynchronous)

#[cfg(feature = "aio")]
use bb8_redis::RedisConnectionManager;
use otel_instrumentation_redis::InstrumentedClient;

async fn setup_async_pool() -> Result<bb8::Pool<RedisConnectionManager>, Box<dyn std::error::Error>> {
    let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
    
    let pool = bb8::Pool::builder()
        .max_size(20)
        .min_idle(Some(5))
        .connection_timeout(std::time::Duration::from_secs(2))
        .build(manager)
        .await?;
    
    Ok(pool)
}

Pipeline Operations

use redis::pipe;

fn example_pipeline(conn: &mut redis::Connection) -> redis::RedisResult<()> {
    // Pipelines are traced as a single span with all commands
    let (k1, k2): (i32, i32) = pipe()
        .atomic()
        .set("key1", 42).ignore()
        .set("key2", 43).ignore()
        .get("key1")
        .get("key2")
        .query(conn)?;
    
    println!("Retrieved values: {} and {}", k1, k2);
    Ok(())
}

Transaction Support

use redis::{Commands, pipe};

fn example_transaction(conn: &mut redis::Connection) -> redis::RedisResult<()> {
    // Watch keys for changes
    conn.watch("balance:user1")?;
    conn.watch("balance:user2")?;
    
    let balance1: i64 = conn.get("balance:user1")?;
    let balance2: i64 = conn.get("balance:user2")?;
    
    // Atomic transaction
    let result: Option<(i64, i64)> = pipe()
        .atomic()
        .set("balance:user1", balance1 - 100).ignore()
        .set("balance:user2", balance2 + 100).ignore()
        .get("balance:user1")
        .get("balance:user2")
        .query(conn)?;
    
    match result {
        Some((new1, new2)) => println!("Transfer complete: {} -> {}", new1, new2),
        None => println!("Transaction aborted due to concurrent modification"),
    }
    
    Ok(())
}

Configuration

OpenTelemetry Setup

use opentelemetry::{global, KeyValue};
use opentelemetry_sdk::{
    runtime::Tokio,
    trace::{self, RandomIdGenerator, Sampler},
    Resource,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions::resource::{
    SERVICE_NAME, SERVICE_VERSION, DEPLOYMENT_ENVIRONMENT,
};

fn init_telemetry() {
    // Configure resource attributes
    let resource = Resource::new(vec![
        KeyValue::new(SERVICE_NAME, "my-cache-service"),
        KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
        KeyValue::new(DEPLOYMENT_ENVIRONMENT, "production"),
    ]);
    
    // Configure OTLP exporter
    let exporter = opentelemetry_otlp::new_exporter()
        .tonic()
        .with_endpoint("http://localhost:4317")
        .with_timeout(std::time::Duration::from_secs(3));
    
    // Build tracer provider
    let provider = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(exporter)
        .with_trace_config(
            trace::config()
                .with_sampler(Sampler::AlwaysOn)
                .with_id_generator(RandomIdGenerator::default())
                .with_max_events_per_span(64)
                .with_max_attributes_per_span(32)
                .with_resource(resource),
        )
        .install_batch(Tokio)
        .expect("Failed to initialize tracer");
    
    // Set global tracer provider
    global::set_tracer_provider(provider);
}

Custom Span Names

While the instrumentation automatically generates span names based on Redis commands, you can wrap operations in custom spans for better organization:

use tracing::{instrument, span, Level};

#[instrument(name = "cache.user.fetch", skip(conn))]
fn get_user_from_cache(conn: &mut redis::Connection, user_id: u64) -> Option<String> {
    let span = span!(Level::INFO, "cache.lookup", user_id = %user_id);
    let _guard = span.enter();
    
    conn.get(format!("user:{}", user_id)).ok()
}

Error Handling

The instrumentation automatically captures and records errors as span events:

use redis::Commands;
use tracing::error;

fn handle_redis_errors(conn: &mut redis::Connection) {
    match conn.get::<_, String>("nonexistent_key") {
        Ok(value) => println!("Value: {}", value),
        Err(e) => {
            // Error is automatically recorded in the span
            error!("Redis operation failed: {}", e);
            
            // You can add additional context
            tracing::Span::current().record("error.details", &format!("{:?}", e));
        }
    }
}

Performance Impact

The instrumentation is designed for minimal overhead in production environments:

Benchmarks

Operation Without Instrumentation With Instrumentation Overhead
GET 45 µs 47 µs ~4%
SET 48 µs 50 µs ~4%
HGETALL 62 µs 65 µs ~5%
Pipeline (10 ops) 125 µs 132 µs ~6%
Async GET 51 µs 53 µs ~4%

Optimization Tips

  1. Use Sampling: Configure appropriate sampling rates for high-volume applications
  2. Batch Operations: Use pipelines for multiple operations to reduce span creation overhead
  3. Connection Pooling: Reuse connections to amortize connection setup costs
  4. Selective Tracing: Filter out high-frequency, low-value operations
use opentelemetry_sdk::trace::{Sampler, ShouldSample};

// Custom sampler to reduce trace volume
struct RedisCommandSampler;

impl ShouldSample for RedisCommandSampler {
    fn should_sample(
        &self,
        parent_context: Option<&opentelemetry::Context>,
        _trace_id: opentelemetry::trace::TraceId,
        name: &str,
        _span_kind: &opentelemetry::trace::SpanKind,
        _attributes: &[opentelemetry::KeyValue],
    ) -> opentelemetry_sdk::trace::SamplingResult {
        // Sample all operations except high-frequency ones
        if name.contains("PING") || name.contains("TIME") {
            SamplingResult::Drop
        } else {
            SamplingResult::RecordAndSample
        }
    }
}

OpenTelemetry Attributes

The instrumentation automatically adds the following attributes to spans according to OpenTelemetry Semantic Conventions:

Attribute Description Example
db.system Database system identifier "redis"
db.operation Redis command name "GET", "HSET", "ZADD"
db.statement Full Redis command "GET user:123"
db.redis.database_index Database index for SELECT 2
net.peer.name Redis server hostname "localhost"
net.peer.port Redis server port 6379
error Error flag true (on failure)
exception.type Error type "RedisError"
exception.message Error message "Connection refused"

Integration Examples

Jaeger Export

use opentelemetry::global;
use opentelemetry_jaeger::{new_agent_pipeline, Result};

fn init_jaeger() -> Result<()> {
    let tracer = new_agent_pipeline()
        .with_service_name("redis-cache-service")
        .with_endpoint("localhost:6831")
        .with_auto_split_batch(true)
        .install_batch(opentelemetry_sdk::runtime::Tokio)?;
    
    global::set_tracer_provider(tracer);
    Ok(())
}

OTLP Export

use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig};

fn init_otlp() -> opentelemetry::sdk::trace::TracerProvider {
    let export_config = ExportConfig {
        endpoint: "http://localhost:4317".to_string(),
        protocol: Protocol::Grpc,
        timeout: std::time::Duration::from_secs(3),
        ..Default::default()
    };
    
    opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(
            opentelemetry_otlp::new_exporter()
                .tonic()
                .with_export_config(export_config)
        )
        .install_batch(opentelemetry_sdk::runtime::Tokio)
        .expect("Failed to initialize OTLP exporter")
}

Prometheus Metrics (Planned)

Future releases will include OpenTelemetry metrics support:

// Coming soon in v0.2.0
use otel_instrumentation_redis::metrics::RedisMetrics;

let metrics = RedisMetrics::new();
metrics.record_operation_duration("GET", duration);
metrics.increment_cache_hits();
metrics.record_connection_pool_size(10);

Best Practices

1. Connection Management

// DO: Reuse connections when possible
let client = InstrumentedClient::new(Client::open("redis://localhost")?);
let conn = client.get_connection()?; // Reuse this connection

// DON'T: Create new connections for each operation
for _ in 0..100 {
    let conn = client.get_connection()?; // Inefficient!
}

2. Error Handling

use redis::RedisError;
use tracing::warn;

fn safe_redis_operation(conn: &mut redis::Connection, key: &str) -> Option<String> {
    match conn.get::<_, String>(key) {
        Ok(value) => Some(value),
        Err(RedisError::Nil) => {
            // Key doesn't exist - this is often expected
            None
        },
        Err(e) => {
            warn!("Unexpected Redis error: {}", e);
            None
        }
    }
}

3. Span Organization

use tracing::{info_span, instrument};

#[instrument(name = "business_logic.process_order", skip_all)]
async fn process_order(order_id: u64, redis: &InstrumentedClient) {
    let span = info_span!("cache_operations", order_id = %order_id);
    let _guard = span.enter();
    
    let mut conn = redis.get_async_connection().await.unwrap();
    
    // All Redis operations within this span are properly nested
    let order: String = conn.get(format!("order:{}", order_id)).await.unwrap();
    conn.set_ex(format!("processing:{}", order_id), "in_progress", 300).await.unwrap();
}

4. Resource Utilization

// Configure connection pools appropriately
let pool = r2d2::Pool::builder()
    .max_size(num_cpus::get() * 2) // Scale with CPU cores
    .min_idle(Some(2))              // Maintain minimum connections
    .connection_timeout(std::time::Duration::from_secs(2))
    .idle_timeout(Some(std::time::Duration::from_secs(300)))
    .build(instrumented_client)?;

5. Testing with Tracing

#[cfg(test)]
mod tests {
    use tracing_test::traced_test;
    
    #[traced_test]
    #[test]
    fn test_redis_operations() {
        let client = setup_test_client();
        let mut conn = client.get_connection().unwrap();
        
        conn.set("test_key", "test_value").unwrap();
        
        // Verify traces were generated
        assert!(logs_contain("redis.command"));
        assert!(logs_contain("SET"));
    }
}

Troubleshooting

Common Issues

No traces appearing

  1. Verify OpenTelemetry is initialized:
// Add at application start
tracing::info!("Starting application with tracing");
  1. Check exporter configuration:
// Enable debug logging for OpenTelemetry
std::env::set_var("OTEL_LOG_LEVEL", "debug");
  1. Ensure spans are being created:
// Manually verify span creation
let span = tracing::info_span!("test_span");
let _guard = span.enter();
tracing::info!("This should appear in traces");

High memory usage

  • Reduce max_attributes_per_span and max_events_per_span
  • Implement sampling to reduce trace volume
  • Use batch processors with appropriate queue sizes

Connection timeout in traces

// Increase connection timeout
let client = Client::open("redis://localhost")?;
client.set_connection_timeout(Some(std::time::Duration::from_secs(5)));

Missing span attributes

// Ensure proper trace context propagation
use opentelemetry::global;
use tracing_opentelemetry::OpenTelemetryLayer;

let provider = init_telemetry();
let layer = OpenTelemetryLayer::new(global::tracer("redis-instrumentation"));
tracing_subscriber::registry().with(layer).init();

Debug Mode

Enable detailed debug output:

use tracing_subscriber::EnvFilter;

tracing_subscriber::fmt()
    .with_env_filter(EnvFilter::from_default_env()
        .add_directive("otel_instrumentation_redis=debug".parse().unwrap()))
    .init();

Performance Profiling

use tracing::{instrument, Level};

#[instrument(level = Level::DEBUG, skip_all)]
fn profile_redis_operations(conn: &mut redis::Connection) {
    let start = std::time::Instant::now();
    
    for _ in 0..1000 {
        conn.get::<_, String>("benchmark_key").ok();
    }
    
    tracing::debug!(
        duration = ?start.elapsed(),
        operations = 1000,
        "Benchmark complete"
    );
}

Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

# Clone the repository
git clone https://github.com/hermes-capital-io/hermes-platform
cd otel-instrumentation-redis

# Run tests
cargo test --all-features

# Run benchmarks
cargo bench

# Check formatting
cargo fmt --check

# Run clippy
cargo clippy --all-features -- -D warnings

Testing

Run the test suite with Redis running locally:

# Start Redis
docker run -d -p 6379:6379 redis:latest

# Run all tests
cargo test --all-features

# Run specific test
cargo test test_sync_operations

# Run with coverage
cargo tarpaulin --all-features

Roadmap

  • v0.2.0: OpenTelemetry Metrics support
  • v0.3.0: Advanced sampling strategies
  • v0.4.0: Redis Cluster support
  • v0.5.0: Redis Sentinel support
  • v1.0.0: Stable API with performance guarantees

License

Licensed under either of:

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Commit count: 0

cargo fmt