| Crates.io | multi-tier-cache |
| lib.rs | multi-tier-cache |
| version | 0.5.7 |
| created_at | 2025-11-03 11:45:08.020714+00 |
| updated_at | 2026-01-24 09:23:43.579515+00 |
| description | Customizable multi-tier cache with L1 (Moka in-memory) + L2 (Redis distributed) defaults, expandable to L3/L4+, cross-instance invalidation via Pub/Sub, stampede protection, and flexible TTL scaling |
| homepage | |
| repository | https://github.com/thichuong/multi-tier-cache |
| max_upload_size | |
| id | 1914529 |
| size | 453,129 |
A high-performance, production-ready multi-tier caching library for Rust featuring L1 (in-memory) + L2 (Redis) caches, automatic stampede protection, and built-in Redis Streams support.
Request โ L1 Cache (Moka) โ L2 Cache (Redis) โ Compute/Fetch
โ Hit (90%) โ Hit (75%) โ Miss (5%)
Return Promote to L1 Store in L1+L2
Add to your Cargo.toml:
[dependencies]
multi-tier-cache = "0.5"
tokio = { version = "1.43", features = ["full"] }
serde_json = "1.0"
Version Guide:
get_or_compute_typed()use multi_tier_cache::{CacheSystem, CacheStrategy};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize cache system (uses REDIS_URL env var)
let cache = CacheSystem::new().await?;
// Store data with cache strategy
let data = serde_json::json!({"user": "alice", "score": 100});
cache.cache_manager()
.set_with_strategy("user:1", data, CacheStrategy::ShortTerm)
.await?;
// Retrieve data (L1 first, then L2 fallback)
if let Some(cached) = cache.cache_manager().get("user:1").await? {
println!("Cached data: {}", cached);
}
// Get statistics
let stats = cache.cache_manager().get_stats();
println!("Hit rate: {:.2}%", stats.hit_rate);
Ok(())
}
Choose the right TTL for your use case:
use std::time::Duration;
// RealTime (10s) - Fast-changing data
cache.cache_manager()
.set_with_strategy("live_price", data, CacheStrategy::RealTime)
.await?;
// ShortTerm (5min) - Frequently accessed data
cache.cache_manager()
.set_with_strategy("session:123", data, CacheStrategy::ShortTerm)
.await?;
// MediumTerm (1hr) - Moderately stable data
cache.cache_manager()
.set_with_strategy("catalog", data, CacheStrategy::MediumTerm)
.await?;
// LongTerm (3hr) - Stable data
cache.cache_manager()
.set_with_strategy("config", data, CacheStrategy::LongTerm)
.await?;
// Custom - Specific requirements
cache.cache_manager()
.set_with_strategy("metrics", data, CacheStrategy::Custom(Duration::from_secs(30)))
.await?;
Fetch data only when cache misses, with stampede protection:
async fn fetch_from_database(id: u32) -> anyhow::Result<serde_json::Value> {
// Expensive operation...
Ok(serde_json::json!({"id": id, "data": "..."}))
}
// Only ONE request will compute, others wait and read from cache
let product = cache.cache_manager()
.get_or_compute_with(
"product:42",
CacheStrategy::MediumTerm,
|| fetch_from_database(42)
)
.await?;
Publish and consume events:
// Publish to stream
let fields = vec![
("event_id".to_string(), "123".to_string()),
("event_type".to_string(), "user_action".to_string()),
("timestamp".to_string(), "2025-01-01T00:00:00Z".to_string()),
];
let entry_id = cache.cache_manager()
.publish_to_stream("events_stream", fields, Some(1000)) // Auto-trim to 1000 entries
.await?;
// Read latest entries
let entries = cache.cache_manager()
.read_stream_latest("events_stream", 10)
.await?;
// Blocking read for new entries
let new_entries = cache.cache_manager()
.read_stream("events_stream", "$", 10, Some(5000)) // Block for 5s
.await?;
Eliminate boilerplate with automatic serialization/deserialization for database queries:
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct User {
id: i64,
name: String,
email: String,
}
// โ OLD WAY: Manual cache + serialize + deserialize (40+ lines)
let cached = cache.cache_manager().get("user:123").await?;
let user: User = match cached {
Some(json) => serde_json::from_value(json)?,
None => {
let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
.bind(123)
.fetch_one(&pool)
.await?;
let json = serde_json::to_value(&user)?;
cache.cache_manager().set_with_strategy("user:123", json, CacheStrategy::MediumTerm).await?;
user
}
};
// โ
NEW WAY: Type-safe automatic caching (5 lines)
let user: User = cache.cache_manager()
.get_or_compute_typed(
"user:123",
CacheStrategy::MediumTerm,
|| async {
sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
.bind(123)
.fetch_one(&pool)
.await
}
)
.await?;
Benefits:
Serialize + DeserializeOwnedMore Examples:
// PostgreSQL Reports
#[derive(Serialize, Deserialize)]
struct Report {
id: i64,
title: String,
data: serde_json::Value,
}
let report: Report = cache.cache_manager()
.get_or_compute_typed(
&format!("report:{}", id),
CacheStrategy::LongTerm,
|| async {
sqlx::query_as("SELECT * FROM reports WHERE id = $1")
.bind(id)
.fetch_one(&pool)
.await
}
)
.await?;
// API Responses
#[derive(Serialize, Deserialize)]
struct ApiData {
status: String,
items: Vec<String>,
}
let data: ApiData = cache.cache_manager()
.get_or_compute_typed(
"api:external",
CacheStrategy::RealTime,
|| async {
reqwest::get("https://api.example.com/data")
.await?
.json::<ApiData>()
.await
}
)
.await?;
// Complex Computations
#[derive(Serialize, Deserialize)]
struct AnalyticsResult {
total: i64,
average: f64,
breakdown: HashMap<String, i64>,
}
let analytics: AnalyticsResult = cache.cache_manager()
.get_or_compute_typed(
"analytics:monthly",
CacheStrategy::Custom(Duration::from_secs(6 * 3600)),
|| async {
// Expensive computation...
compute_monthly_analytics(&pool).await
}
)
.await?;
Performance:
Keep caches synchronized across multiple servers/instances using Redis Pub/Sub:
In distributed systems with multiple cache instances, stale data is a common problem:
Solution: Real-time cache invalidation across ALL instances!
1. Remove Strategy (Lazy Reload)
use multi_tier_cache::{CacheManager, L1Cache, L2Cache, InvalidationConfig};
// Initialize with invalidation support
let config = InvalidationConfig::default();
let cache_manager = CacheManager::new_with_invalidation(
Arc::new(L1Cache::new().await?),
Arc::new(L2Cache::new().await?),
"redis://localhost",
config
).await?;
// Update database
database.update_user(123, new_data).await?;
// Invalidate cache across ALL instances
// โ Cache removed, next access triggers reload
cache_manager.invalidate("user:123").await?;
2. Update Strategy (Zero Cache Miss)
// Update database
database.update_user(123, new_data).await?;
// Push new data directly to ALL instances' L1 caches
// โ No cache miss, instant update!
cache_manager.update_cache(
"user:123",
serde_json::to_value(&new_data)?,
Some(Duration::from_secs(3600))
).await?;
Invalidate multiple related keys at once:
// Update product category in database
database.update_category(42, new_price).await?;
// Invalidate ALL products in category across ALL instances
cache_manager.invalidate_pattern("product:category:42:*").await?;
Cache and broadcast in one operation:
let report = generate_monthly_report().await?;
// Cache locally AND broadcast to all other instances
cache_manager.set_with_broadcast(
"report:monthly",
serde_json::to_value(&report)?,
CacheStrategy::LongTerm
).await?;
Instance A Redis Pub/Sub Instance B
โ โ โ
โ 1. Update data โ โ
โ 2. Broadcast msg โโโ>โ โ
โ โ 3. Receive msg โโโ>โ
โ โ 4. Update L1 โโโโโ
โ โ โ
Performance:
use multi_tier_cache::InvalidationConfig;
let config = InvalidationConfig {
channel: "my_app:cache:invalidate".to_string(),
auto_broadcast_on_write: false, // Manual control
enable_audit_stream: true, // Enable audit trail
audit_stream: "cache:invalidations".to_string(),
audit_stream_maxlen: Some(10000),
};
When to Use:
Comparison:
| Strategy | Bandwidth | Cache Miss | Use Case |
|---|---|---|---|
| Remove | Low | Yes (on next access) | Large values, infrequent access |
| Update | Higher | No (instant) | Small values, frequent access |
| Pattern | Medium | Yes | Bulk invalidation (categories) |
Starting from v0.5.2, the library includes multiple built-in cache backend implementations beyond the defaults!
| Backend | Feature | Performance | Eviction | Use Case |
|---|---|---|---|---|
| MokaCache (default) | Always available | High | Automatic (LRU + TTL) | Production workloads |
| DashMapCache | Always available | Medium | Manual cleanup | Simple caching, education |
| QuickCacheBackend | backend-quickcache |
Very High | Automatic (LRU) | Maximum throughput |
| Backend | Feature | Persistence | TTL Introspection | Use Case |
|---|---|---|---|---|
| RedisCache (default) | Always available | Yes (disk) | โ Yes | Production, multi-instance |
| MemcachedCache | backend-memcached |
No (memory only) | โ No | High-performance distributed |
use multi_tier_cache::{DashMapCache, CacheSystemBuilder, CacheBackend};
use std::sync::Arc;
let dashmap_l1 = Arc::new(DashMapCache::new());
let cache = CacheSystemBuilder::new()
.with_l1(dashmap_l1 as Arc<dyn CacheBackend>)
.build()
.await?;
[dependencies]
multi-tier-cache = { version = "0.5", features = ["backend-quickcache"] }
use multi_tier_cache::{QuickCacheBackend, CacheSystemBuilder, CacheBackend};
use std::sync::Arc;
let quickcache_l1 = Arc::new(QuickCacheBackend::new(5000).await?);
let cache = CacheSystemBuilder::new()
.with_l1(quickcache_l1 as Arc<dyn CacheBackend>)
.build()
.await?;
See Example: examples/builtin_backends.rs for complete demonstrations of all backends.
Starting from v0.3.0, you can replace the default Moka (L1) and Redis (L2) backends with your own custom implementations!
Use Cases:
use multi_tier_cache::{CacheBackend, CacheSystemBuilder, async_trait};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use anyhow::Result;
struct HashMapCache {
store: Arc<RwLock<HashMap<String, (serde_json::Value, Instant)>>>,
}
#[async_trait]
impl CacheBackend for HashMapCache {
async fn get(&self, key: &str) -> Option<serde_json::Value> {
let store = self.store.read().unwrap();
store.get(key).and_then(|(value, expiry)| {
if *expiry > Instant::now() {
Some(value.clone())
} else {
None
}
})
}
async fn set_with_ttl(
&self,
key: &str,
value: serde_json::Value,
ttl: Duration,
) -> Result<()> {
let mut store = self.store.write().unwrap();
store.insert(key.to_string(), (value, Instant::now() + ttl));
Ok(())
}
async fn remove(&self, key: &str) -> Result<()> {
self.store.write().unwrap().remove(key);
Ok(())
}
async fn health_check(&self) -> bool {
true
}
fn name(&self) -> &str {
"HashMap"
}
}
// Use custom backend
let custom_l1 = Arc::new(HashMapCache::new());
let cache = CacheSystemBuilder::new()
.with_l1(custom_l1 as Arc<dyn CacheBackend>)
.build()
.await?;
For L2 caches, implement L2CacheBackend which extends CacheBackend with get_with_ttl():
use multi_tier_cache::{L2CacheBackend, async_trait};
#[async_trait]
impl CacheBackend for MyCustomL2 {
// ... implement CacheBackend methods
}
#[async_trait]
impl L2CacheBackend for MyCustomL2 {
async fn get_with_ttl(
&self,
key: &str,
) -> Option<(serde_json::Value, Option<Duration>)> {
// Return value with remaining TTL
Some((value, Some(remaining_ttl)))
}
}
use multi_tier_cache::CacheSystemBuilder;
let cache = CacheSystemBuilder::new()
.with_l1(custom_l1) // Custom L1 backend
.with_l2(custom_l2) // Custom L2 backend
.with_streams(kafka) // Optional: Custom streaming backend
.build()
.await?;
Mix and Match:
See: examples/custom_backends.rs for complete working examples including:
Starting from v0.5.0, you can configure 3, 4, or more cache tiers beyond the default L1+L2 setup!
Use Cases:
Request โ L1 (Hot - RAM) โ L2 (Warm - Redis) โ L3 (Cold - RocksDB) โ L4 (Archive - S3)
<1ms (95%) 2-5ms (4%) 10-50ms (0.9%) 100-500ms (0.1%)
use multi_tier_cache::{CacheSystemBuilder, TierConfig, L2Cache};
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Setup backends
let l1 = Arc::new(L2Cache::new().await?); // Fast: Redis
let l2 = Arc::new(L2Cache::new().await?); // Warm: Redis
let l3 = Arc::new(RocksDBCache::new("/tmp/cache").await?); // Cold: RocksDB
// Build 3-tier cache
let cache = CacheSystemBuilder::new()
.with_tier(l1, TierConfig::as_l1())
.with_tier(l2, TierConfig::as_l2())
.with_l3(l3) // Convenience method: 2x TTL
.build()
.await?;
// Use as normal - transparent multi-tier
cache.cache_manager()
.set_with_strategy("key", data, CacheStrategy::LongTerm)
.await?;
Ok(())
}
Pre-configured Tiers:
// L1 - Hot tier (no promotion, standard TTL)
TierConfig::as_l1()
// L2 - Warm tier (promote to L1, standard TTL)
TierConfig::as_l2()
// L3 - Cold tier (promote to L2+L1, 2x TTL)
TierConfig::as_l3()
// L4 - Archive tier (promote to all, 8x TTL)
TierConfig::as_l4()
Custom Tier:
TierConfig::new(3)
.with_promotion(true) // Auto-promote on hit
.with_ttl_scale(5.0) // 5x TTL multiplier
.with_level(3) // Tier number
// Set data with 1-hour TTL
cache.cache_manager()
.set_with_strategy("product:123", data, CacheStrategy::MediumTerm) // 1hr
.await?;
// Actual TTL per tier:
// L1: 1 hour (scale = 1.0x)
// L2: 1 hour (scale = 1.0x)
// L3: 2 hours (scale = 2.0x) โ Keeps data longer!
// L4: 8 hours (scale = 8.0x) โ Much longer retention!
Track hit rates for each tier:
if let Some(tier_stats) = cache.cache_manager().get_tier_stats() {
for stats in tier_stats {
println!("L{}: {} hits ({})",
stats.tier_level,
stats.hit_count(),
stats.backend_name);
}
}
// Output:
// L1: 9500 hits (Redis)
// L2: 450 hits (Redis)
// L3: 45 hits (RocksDB)
// L4: 5 hits (S3)
let cache = CacheSystemBuilder::new()
.with_tier(moka_l1, TierConfig::as_l1())
.with_tier(redis_l2, TierConfig::as_l2())
.with_tier(rocksdb_l3, TierConfig::as_l3())
.with_tier(s3_l4, TierConfig::as_l4())
.build()
.await?;
When data is found in a lower tier (e.g., L3), it's automatically promoted to all upper tiers:
Request for "key"
โโ Check L1 โ Miss
โโ Check L2 โ Miss
โโ Check L3 โ HIT!
โโ Promote to L2 (with original TTL)
โโ Promote to L1 (with original TTL)
โโ Return data
Next request for "key" โ L1 Hit! <1ms
Existing 2-tier users: No changes required! Your code continues to work:
// This still works exactly as before (v0.1.0 - v0.4.x)
let cache = CacheSystemBuilder::new().build().await?;
Multi-tier mode is opt-in via .with_tier() or .with_l3()/.with_l4() methods.
โ Good fit:
โ Not needed:
You can customize the Moka in-memory cache settings (capacity, TTL) using MokaCacheConfig via the builder:
use multi_tier_cache::{CacheSystemBuilder, MokaCacheConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = MokaCacheConfig {
max_capacity: 10_000,
time_to_live: Duration::from_secs(30 * 60), // 30 mins
time_to_idle: Duration::from_secs(5 * 60), // 5 mins
};
let cache = CacheSystemBuilder::new()
.with_moka_config(config)
.build()
.await?;
Ok(())
}
โ Compatible:
invalidate, update_cache) work with any backendโ ๏ธ Limited Support:
invalidate_pattern) requires concrete Redis L2CacheExample:
// โ
Works: Default Redis + Invalidation
let cache = CacheManager::new_with_invalidation(
Arc::new(L1Cache::new().await?),
Arc::new(L2Cache::new().await?), // Concrete Redis L2
"redis://localhost",
InvalidationConfig::default()
).await?;
cache.invalidate("key").await?; // โ
Works
cache.invalidate_pattern("user:*").await?; // โ
Works (has scan_keys)
// โ ๏ธ Limited: Custom L2 + Invalidation
let cache = CacheManager::new_with_backends(
custom_l1,
custom_l2, // Custom trait-based L2
None
).await?;
// Pattern invalidation not available without concrete L2Cache
// Use single-key invalidation instead
All features work together seamlessly:
use multi_tier_cache::*;
// v0.4.0: Invalidation
let config = InvalidationConfig::default();
// v0.3.0: Custom backends (or use defaults)
let l1 = Arc::new(L1Cache::new().await?);
let l2 = Arc::new(L2Cache::new().await?);
// Initialize with invalidation
let cache_manager = CacheManager::new_with_invalidation(
l1, l2, "redis://localhost", config
).await?;
// v0.2.0: Type-safe caching
let user: User = cache_manager.get_or_compute_typed(
"user:123",
CacheStrategy::MediumTerm,
|| fetch_user(123)
).await?;
// v0.4.0: Invalidate across instances
cache_manager.invalidate("user:123").await?;
// v0.1.0: All core features work
let stats = cache_manager.get_stats();
println!("Hit rate: {:.2}%", stats.hit_rate);
No Conflicts: All features are designed to work together without interference.
Tested in production environment:
| Metric | Value |
|---|---|
| Throughput | 16,829+ requests/second |
| Latency (p50) | 5.2ms |
| Cache Hit Rate | 95% (L1: 90%, L2: 75%) |
| Stampede Protection | 99.6% latency reduction (534ms โ 5.2ms) |
| Success Rate | 100% (zero failures under load) |
| Library | Multi-Tier | Stampede Protection | Redis Support | Streams | Invalidation |
|---|---|---|---|---|---|
| multi-tier-cache | โ L1+L2 | โ Full | โ Full | โ Built-in | โ Pub/Sub |
| cached | โ Single | โ No | โ No | โ No | โ No |
| moka | โ L1 only | โ L1 only | โ No | โ No | โ No |
| redis-rs | โ No cache | โ Manual | โ Low-level | โ Manual | โ Manual |
The library includes comprehensive benchmarks built with Criterion:
# Run all benchmarks
cargo bench
# Run specific benchmark suite
cargo bench --bench cache_operations
cargo bench --bench stampede_protection
cargo bench --bench invalidation
cargo bench --bench serialization
# Generate detailed HTML reports
cargo bench -- --save-baseline my_baseline
Benchmark Suites:
Results are saved to target/criterion/ with interactive HTML reports.
The library connects to Redis using the REDIS_URL environment variable. Configuration priority (highest to lowest):
// Set custom Redis URL before initialization
let cache = CacheSystem::with_redis_url("redis://production:6379").await?;
# Set in shell
export REDIS_URL="redis://your-redis-host:6379"
cargo run
# Create .env file in project root
REDIS_URL="redis://localhost:6379"
If not configured, defaults to: redis://127.0.0.1:6379
Development (Local Redis)
# .env
REDIS_URL="redis://127.0.0.1:6379"
Production (Cloud Redis with Authentication)
# Railway, Render, AWS ElastiCache, etc.
REDIS_URL="redis://:your-password@redis-host.cloud:6379"
Docker Compose
services:
app:
environment:
- REDIS_URL=redis://redis:6379
redis:
image: redis:7-alpine
ports:
- "6379:6379"
Testing (Separate Instance)
#[tokio::test]
async fn test_cache() {
let cache = CacheSystem::with_redis_url("redis://localhost:6380").await?;
// Test logic...
}
redis://[username]:[password]@[host]:[port]/[database]
Examples:
redis://localhost:6379 - Local Redis, no authenticationredis://:mypassword@localhost:6379 - Local with password onlyredis://user:pass@redis.example.com:6379/0 - Remote with username, password, and database 0rediss://redis.cloud:6380 - SSL/TLS connection (note the rediss://)Connection Refused
# Check if Redis is running
redis-cli ping # Should return "PONG"
# Check the port
netstat -an | grep 6379
# Verify REDIS_URL
echo $REDIS_URL
Authentication Failed
# Ensure password is in the URL
REDIS_URL="redis://:YOUR_PASSWORD@host:6379"
# Test connection with redis-cli
redis-cli -h host -p 6379 -a YOUR_PASSWORD ping
Timeout Errors
ping your-redis-hostmaxclients setting (may be full)redis-cli INFO clientsDNS Resolution Issues
# Test DNS resolution
nslookup your-redis-host.com
# Use IP address as fallback
REDIS_URL="redis://192.168.1.100:6379"
Default settings (configurable in library source):
The library includes comprehensive integration tests (30 tests) that verify functionality with real Redis:
# Run all integration tests
cargo test --tests
# Run specific test suite
cargo test --test integration_basic
cargo test --test integration_invalidation
cargo test --test integration_stampede
cargo test --test integration_streams
Test Coverage:
Requirements:
localhost:6379 (or set REDIS_URL)Test Structure:
tests/
โโโ common/mod.rs # Shared utilities
โโโ integration_basic.rs # Core cache operations
โโโ integration_invalidation.rs # Cross-instance sync
โโโ integration_stampede.rs # Concurrent access
โโโ integration_streams.rs # Redis Streams
Run examples with:
# Basic usage
cargo run --example basic_usage
# Stampede protection demonstration
cargo run --example stampede_protection
# Redis Streams
cargo run --example redis_streams
# Cache strategies
cargo run --example cache_strategies
# Advanced patterns
cargo run --example advanced_usage
# Health monitoring
cargo run --example health_monitoring
When multiple requests hit an expired cache key simultaneously:
Performance Impact:
When data is found in L2 but not L1:
# Development
cargo build
# Release (optimized)
cargo build --release
# Run tests
cargo test
# Generate and open docs
cargo doc --open
cached crate// Before (cached)
use cached::proc_macro::cached;
#[cached(time = 60)]
fn expensive_function(arg: String) -> String {
// ...
}
// After (multi-tier-cache)
async fn expensive_function(cache: &CacheManager, arg: String) -> Result<String> {
cache.get_or_compute_with(
&format!("func:{}", arg),
CacheStrategy::ShortTerm,
|| async { /* computation */ }
).await
}
// Before (redis-rs)
let mut conn = client.get_connection()?;
let value: String = conn.get("key")?;
conn.set_ex("key", value, 3600)?;
// After (multi-tier-cache)
if let Some(value) = cache.cache_manager().get("key").await? {
// Use cached value
}
cache.cache_manager()
.set_with_strategy("key", value, CacheStrategy::MediumTerm)
.await?;
Contributions are welcome! Please feel free to submit a Pull Request.
Licensed under either of:
at your option.
Built with:
Made with โค๏ธ in Rust | Production-proven in crypto trading dashboard serving 16,829+ RPS