| Crates.io | otel-instrumentation-redis |
| lib.rs | otel-instrumentation-redis |
| version | 0.1.1 |
| created_at | 2025-09-03 21:00:49.272214+00 |
| updated_at | 2025-09-03 21:00:49.272214+00 |
| description | OpenTelemetry instrumentation for redis-rs |
| homepage | |
| repository | https://github.com/hermes-capital-io/hermes-platform |
| max_upload_size | |
| id | 1823235 |
| size | 142,909 |
Production-ready OpenTelemetry instrumentation for the redis-rs crate, providing distributed tracing and observability for Redis operations with minimal performance overhead.
Add this to your Cargo.toml:
[dependencies]
otel-instrumentation-redis = "0.1.0"
# For synchronous Redis operations (default)
redis = "0.32"
# For async operations, also add:
tokio = { version = "1.0", features = ["full"] }
# OpenTelemetry dependencies
opentelemetry = "0.30"
opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"] }
opentelemetry-semantic-conventions = "0.30"
# Default: synchronous support only
otel-instrumentation-redis = "0.1.0"
# Async support with tokio
otel-instrumentation-redis = { version = "0.1.0", features = ["aio"] }
# Both sync and async
otel-instrumentation-redis = { version = "0.1.0", features = ["sync", "aio"] }
use otel_instrumentation_redis::InstrumentedClient;
use redis::{Client, Commands};
use opentelemetry::global;
fn main() -> redis::RedisResult<()> {
// Initialize OpenTelemetry (see Configuration section for details)
init_telemetry();
// Create an instrumented Redis client
let client = Client::open("redis://127.0.0.1:6379")?;
let instrumented = InstrumentedClient::new(client);
// Get a connection - this is automatically traced
let mut conn = instrumented.get_connection()?;
// All Redis operations are now traced
conn.set("user:1:name", "Alice")?;
let name: String = conn.get("user:1:name")?;
println!("Retrieved name: {}", name);
// Shutdown telemetry
global::shutdown_tracer_provider();
Ok(())
}
use otel_instrumentation_redis::InstrumentedClient;
use redis::{Client, Commands, Connection};
fn example_sync() -> redis::RedisResult<()> {
let client = Client::open("redis://127.0.0.1:6379")?;
let instrumented = InstrumentedClient::new(client);
let mut conn = instrumented.get_connection()?;
// String operations
conn.set_ex("session:abc123", "user_data", 3600)?;
conn.expire("session:abc123", 7200)?;
let ttl: i64 = conn.ttl("session:abc123")?;
// Hash operations
conn.hset_multiple("user:1", &[
("name", "Alice"),
("email", "alice@example.com"),
("role", "admin"),
])?;
let user_data: Vec<String> = conn.hvals("user:1")?;
// List operations
conn.lpush("queue:tasks", "task1")?;
conn.rpush("queue:tasks", "task2")?;
let task: Option<String> = conn.lpop("queue:tasks", None)?;
// Set operations
conn.sadd("online_users", "user:1")?;
conn.sadd("online_users", "user:2")?;
let count: i64 = conn.scard("online_users")?;
// Sorted set operations
conn.zadd("leaderboard", "Alice", 100)?;
conn.zadd("leaderboard", "Bob", 95)?;
let top_players: Vec<String> = conn.zrevrange("leaderboard", 0, 9)?;
Ok(())
}
#[cfg(feature = "aio")]
use otel_instrumentation_redis::InstrumentedClient;
use redis::{Client, AsyncCommands};
#[tokio::main]
async fn example_async() -> redis::RedisResult<()> {
let client = Client::open("redis://127.0.0.1:6379")?;
let instrumented = InstrumentedClient::new(client);
// Standard async connection
let mut conn = instrumented.get_async_connection().await?;
// Async operations with automatic tracing
conn.set("async_key", "async_value").await?;
let value: String = conn.get("async_key").await?;
// Multiplexed connection for better performance
let mut multiplexed = instrumented.get_multiplexed_async_connection().await?;
// Concurrent operations
let futures = vec![
Box::pin(multiplexed.clone().set("key1", "value1")),
Box::pin(multiplexed.clone().set("key2", "value2")),
Box::pin(multiplexed.clone().set("key3", "value3")),
];
futures::future::join_all(futures).await;
// Pub/Sub with tracing
let mut pubsub = conn.as_pubsub();
pubsub.subscribe("channel1").await?;
pubsub.subscribe("channel2").await?;
// Messages are traced as they're received
let msg = pubsub.get_message().await?;
println!("Received: {:?}", msg);
Ok(())
}
use otel_instrumentation_redis::InstrumentedClient;
use r2d2::Pool;
use redis::Client;
fn setup_connection_pool() -> redis::RedisResult<Pool<InstrumentedClient>> {
let client = Client::open("redis://127.0.0.1:6379")?;
let instrumented = InstrumentedClient::new(client);
let pool = r2d2::Pool::builder()
.max_size(15)
.min_idle(Some(5))
.connection_timeout(std::time::Duration::from_secs(2))
.idle_timeout(Some(std::time::Duration::from_secs(60)))
.build(instrumented)?;
Ok(pool)
}
fn use_pool(pool: &Pool<InstrumentedClient>) -> redis::RedisResult<()> {
let mut conn = pool.get()?;
// Connection is automatically returned to pool when dropped
conn.set("pooled_key", "pooled_value")?;
let value: String = conn.get("pooled_key")?;
Ok(())
}
#[cfg(feature = "aio")]
use bb8_redis::RedisConnectionManager;
use otel_instrumentation_redis::InstrumentedClient;
async fn setup_async_pool() -> Result<bb8::Pool<RedisConnectionManager>, Box<dyn std::error::Error>> {
let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
let pool = bb8::Pool::builder()
.max_size(20)
.min_idle(Some(5))
.connection_timeout(std::time::Duration::from_secs(2))
.build(manager)
.await?;
Ok(pool)
}
use redis::pipe;
fn example_pipeline(conn: &mut redis::Connection) -> redis::RedisResult<()> {
// Pipelines are traced as a single span with all commands
let (k1, k2): (i32, i32) = pipe()
.atomic()
.set("key1", 42).ignore()
.set("key2", 43).ignore()
.get("key1")
.get("key2")
.query(conn)?;
println!("Retrieved values: {} and {}", k1, k2);
Ok(())
}
use redis::{Commands, pipe};
fn example_transaction(conn: &mut redis::Connection) -> redis::RedisResult<()> {
// Watch keys for changes
conn.watch("balance:user1")?;
conn.watch("balance:user2")?;
let balance1: i64 = conn.get("balance:user1")?;
let balance2: i64 = conn.get("balance:user2")?;
// Atomic transaction
let result: Option<(i64, i64)> = pipe()
.atomic()
.set("balance:user1", balance1 - 100).ignore()
.set("balance:user2", balance2 + 100).ignore()
.get("balance:user1")
.get("balance:user2")
.query(conn)?;
match result {
Some((new1, new2)) => println!("Transfer complete: {} -> {}", new1, new2),
None => println!("Transaction aborted due to concurrent modification"),
}
Ok(())
}
use opentelemetry::{global, KeyValue};
use opentelemetry_sdk::{
runtime::Tokio,
trace::{self, RandomIdGenerator, Sampler},
Resource,
};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_semantic_conventions::resource::{
SERVICE_NAME, SERVICE_VERSION, DEPLOYMENT_ENVIRONMENT,
};
fn init_telemetry() {
// Configure resource attributes
let resource = Resource::new(vec![
KeyValue::new(SERVICE_NAME, "my-cache-service"),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT, "production"),
]);
// Configure OTLP exporter
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317")
.with_timeout(std::time::Duration::from_secs(3));
// Build tracer provider
let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(
trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_max_events_per_span(64)
.with_max_attributes_per_span(32)
.with_resource(resource),
)
.install_batch(Tokio)
.expect("Failed to initialize tracer");
// Set global tracer provider
global::set_tracer_provider(provider);
}
While the instrumentation automatically generates span names based on Redis commands, you can wrap operations in custom spans for better organization:
use tracing::{instrument, span, Level};
#[instrument(name = "cache.user.fetch", skip(conn))]
fn get_user_from_cache(conn: &mut redis::Connection, user_id: u64) -> Option<String> {
let span = span!(Level::INFO, "cache.lookup", user_id = %user_id);
let _guard = span.enter();
conn.get(format!("user:{}", user_id)).ok()
}
The instrumentation automatically captures and records errors as span events:
use redis::Commands;
use tracing::error;
fn handle_redis_errors(conn: &mut redis::Connection) {
match conn.get::<_, String>("nonexistent_key") {
Ok(value) => println!("Value: {}", value),
Err(e) => {
// Error is automatically recorded in the span
error!("Redis operation failed: {}", e);
// You can add additional context
tracing::Span::current().record("error.details", &format!("{:?}", e));
}
}
}
The instrumentation is designed for minimal overhead in production environments:
| Operation | Without Instrumentation | With Instrumentation | Overhead |
|---|---|---|---|
| GET | 45 µs | 47 µs | ~4% |
| SET | 48 µs | 50 µs | ~4% |
| HGETALL | 62 µs | 65 µs | ~5% |
| Pipeline (10 ops) | 125 µs | 132 µs | ~6% |
| Async GET | 51 µs | 53 µs | ~4% |
use opentelemetry_sdk::trace::{Sampler, ShouldSample};
// Custom sampler to reduce trace volume
struct RedisCommandSampler;
impl ShouldSample for RedisCommandSampler {
fn should_sample(
&self,
parent_context: Option<&opentelemetry::Context>,
_trace_id: opentelemetry::trace::TraceId,
name: &str,
_span_kind: &opentelemetry::trace::SpanKind,
_attributes: &[opentelemetry::KeyValue],
) -> opentelemetry_sdk::trace::SamplingResult {
// Sample all operations except high-frequency ones
if name.contains("PING") || name.contains("TIME") {
SamplingResult::Drop
} else {
SamplingResult::RecordAndSample
}
}
}
The instrumentation automatically adds the following attributes to spans according to OpenTelemetry Semantic Conventions:
| Attribute | Description | Example |
|---|---|---|
db.system |
Database system identifier | "redis" |
db.operation |
Redis command name | "GET", "HSET", "ZADD" |
db.statement |
Full Redis command | "GET user:123" |
db.redis.database_index |
Database index for SELECT | 2 |
net.peer.name |
Redis server hostname | "localhost" |
net.peer.port |
Redis server port | 6379 |
error |
Error flag | true (on failure) |
exception.type |
Error type | "RedisError" |
exception.message |
Error message | "Connection refused" |
use opentelemetry::global;
use opentelemetry_jaeger::{new_agent_pipeline, Result};
fn init_jaeger() -> Result<()> {
let tracer = new_agent_pipeline()
.with_service_name("redis-cache-service")
.with_endpoint("localhost:6831")
.with_auto_split_batch(true)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
global::set_tracer_provider(tracer);
Ok(())
}
use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig};
fn init_otlp() -> opentelemetry::sdk::trace::TracerProvider {
let export_config = ExportConfig {
endpoint: "http://localhost:4317".to_string(),
protocol: Protocol::Grpc,
timeout: std::time::Duration::from_secs(3),
..Default::default()
};
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(export_config)
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("Failed to initialize OTLP exporter")
}
Future releases will include OpenTelemetry metrics support:
// Coming soon in v0.2.0
use otel_instrumentation_redis::metrics::RedisMetrics;
let metrics = RedisMetrics::new();
metrics.record_operation_duration("GET", duration);
metrics.increment_cache_hits();
metrics.record_connection_pool_size(10);
// DO: Reuse connections when possible
let client = InstrumentedClient::new(Client::open("redis://localhost")?);
let conn = client.get_connection()?; // Reuse this connection
// DON'T: Create new connections for each operation
for _ in 0..100 {
let conn = client.get_connection()?; // Inefficient!
}
use redis::RedisError;
use tracing::warn;
fn safe_redis_operation(conn: &mut redis::Connection, key: &str) -> Option<String> {
match conn.get::<_, String>(key) {
Ok(value) => Some(value),
Err(RedisError::Nil) => {
// Key doesn't exist - this is often expected
None
},
Err(e) => {
warn!("Unexpected Redis error: {}", e);
None
}
}
}
use tracing::{info_span, instrument};
#[instrument(name = "business_logic.process_order", skip_all)]
async fn process_order(order_id: u64, redis: &InstrumentedClient) {
let span = info_span!("cache_operations", order_id = %order_id);
let _guard = span.enter();
let mut conn = redis.get_async_connection().await.unwrap();
// All Redis operations within this span are properly nested
let order: String = conn.get(format!("order:{}", order_id)).await.unwrap();
conn.set_ex(format!("processing:{}", order_id), "in_progress", 300).await.unwrap();
}
// Configure connection pools appropriately
let pool = r2d2::Pool::builder()
.max_size(num_cpus::get() * 2) // Scale with CPU cores
.min_idle(Some(2)) // Maintain minimum connections
.connection_timeout(std::time::Duration::from_secs(2))
.idle_timeout(Some(std::time::Duration::from_secs(300)))
.build(instrumented_client)?;
#[cfg(test)]
mod tests {
use tracing_test::traced_test;
#[traced_test]
#[test]
fn test_redis_operations() {
let client = setup_test_client();
let mut conn = client.get_connection().unwrap();
conn.set("test_key", "test_value").unwrap();
// Verify traces were generated
assert!(logs_contain("redis.command"));
assert!(logs_contain("SET"));
}
}
// Add at application start
tracing::info!("Starting application with tracing");
// Enable debug logging for OpenTelemetry
std::env::set_var("OTEL_LOG_LEVEL", "debug");
// Manually verify span creation
let span = tracing::info_span!("test_span");
let _guard = span.enter();
tracing::info!("This should appear in traces");
max_attributes_per_span and max_events_per_span// Increase connection timeout
let client = Client::open("redis://localhost")?;
client.set_connection_timeout(Some(std::time::Duration::from_secs(5)));
// Ensure proper trace context propagation
use opentelemetry::global;
use tracing_opentelemetry::OpenTelemetryLayer;
let provider = init_telemetry();
let layer = OpenTelemetryLayer::new(global::tracer("redis-instrumentation"));
tracing_subscriber::registry().with(layer).init();
Enable detailed debug output:
use tracing_subscriber::EnvFilter;
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env()
.add_directive("otel_instrumentation_redis=debug".parse().unwrap()))
.init();
use tracing::{instrument, Level};
#[instrument(level = Level::DEBUG, skip_all)]
fn profile_redis_operations(conn: &mut redis::Connection) {
let start = std::time::Instant::now();
for _ in 0..1000 {
conn.get::<_, String>("benchmark_key").ok();
}
tracing::debug!(
duration = ?start.elapsed(),
operations = 1000,
"Benchmark complete"
);
}
We welcome contributions! Please see our Contributing Guide for details.
# Clone the repository
git clone https://github.com/hermes-capital-io/hermes-platform
cd otel-instrumentation-redis
# Run tests
cargo test --all-features
# Run benchmarks
cargo bench
# Check formatting
cargo fmt --check
# Run clippy
cargo clippy --all-features -- -D warnings
Run the test suite with Redis running locally:
# Start Redis
docker run -d -p 6379:6379 redis:latest
# Run all tests
cargo test --all-features
# Run specific test
cargo test test_sync_operations
# Run with coverage
cargo tarpaulin --all-features
Licensed under either of:
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.