| Crates.io | cal-redis |
| lib.rs | cal-redis |
| version | 0.1.80 |
| created_at | 2025-03-21 22:03:48.807554+00 |
| updated_at | 2025-07-22 15:30:23.51548+00 |
| description | Callable Redis Implementation |
| homepage | |
| repository | |
| max_upload_size | |
| id | 1601298 |
| size | 223,830 |
Cal-Redis is a dual-layer caching system that combines local in-memory caching (using Moka) with Redis for distributed caching. It provides a unified, type-safe API for accessing Callable platform data with automatic fallback mechanisms and real-time cache synchronization via Redis pub/sub.
┌─────────────────┐ ┌─────────────────┐
│ Application │ │ Application │
│ (rust-softphone)│ │ (rust-softphone)│
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ CallableCache │ │ CallableCache │
├─────────────────┤ ├─────────────────┤
│ Local Cache │ │ Local Cache │
│ (Moka) │ │ (Moka) │
├─────────────────┤ ├─────────────────┤
│ Remote Cache │◄────────┤ Remote Cache │
│ (Redis) │ Pub/Sub│ (Redis) │
└─────────────────┘ └─────────────────┘
│ │
└──────────┬────────────────┘
▼
┌─────────────┐
│ Redis │
│ (Valkey) │
└─────────────┘
cal-redis/
├── src/
│ ├── lib.rs # Public API exports
│ ├── cache.rs # Main CallableCache implementation (~900 lines)
│ ├── constants.rs # All Redis key patterns and builders
│ ├── common.rs # Shared utilities for serialization
│ ├── local_cache.rs # Moka-based local cache implementation
│ ├── redis_cache.rs # Redis connection wrapper
│ ├── helpers.rs # RedisHelpers trait for common operations
│ ├── account.rs # Account-related Redis operations
│ ├── region.rs # Region-related Redis operations
│ ├── proxy.rs # Proxy-related Redis operations
│ └── user.rs # User-related Redis operations
├── Cargo.toml
└── README.md
All keys follow a consistent pattern with the cal: prefix and account isolation where applicable:
cal:regions # All regions
cal:region:idents # Region identifier mappings
cal:proxies # All proxies
cal:accounts # All account summaries
cal:account:idents # Account identifier mappings
cal:users # All users
cal:user:idents # User identifier mappings
cal:trunk:{trunk_ip} # Trunk to account mappings
cal:ws:connections # WebSocket connection mappings
cal:events # Global event channel
cal:account:{account_id}:devices # Account devices
cal:account:{account_id}:device:idents # Device identifier mappings
cal:account:{account_id}:ddis # Account DDIs
cal:account:{account_id}:trunks # Account trunks
cal:account:{account_id}:hooks # Account hooks
cal:account:{account_id}:assets # Account assets
cal:account:{account_id}:addresses # Account addresses
cal:account:{account_id}:agent:status:{user_id} # Agent status
cal:account:{account_id}:agents:by_user # Agent name mappings
cal:account:{account_id}:agents:registered # Registered agents set
cal:account:{account_id}:agents:available # Available agents set
cal:account:{account_id}:agents:connected # Connected agents set
cal:account:{account_id}:session:{session_id} # Session data
cal:account:{account_id}:sessions:active # Active sessions set
cal:account:{account_id}:queue:{queue_name} # Queue (sorted set)
cal:account:{account_id}:queues:meta # Queue metadata
cal:account:{account_id}:conversation:{conversation_id} # Conversation data
cal:account:{account_id}:events # Account event channel
The main entry point providing a unified caching interface:
pub struct CallableCache {
pub local_cache: LocalCache, // Moka-based in-memory cache
pub remote_cache: RedisCache, // Redis connection wrapper
}
Cache-first (default): Checks local cache, then Redis
let user = cache.get_user_by_id("user123").await?;
Direct Redis: Bypasses local cache for transient data
let session = cache.session_get(&account_id, &session_id).await?;
Raw Redis connection: For custom operations
let mut con = cache.redis_connection();
con.xadd("stream:events", "*", &[("event", "login")]).await?;
Uses Moka for efficient in-memory caching with TTL:
Redis pub/sub events automatically update local caches:
All operations are account-isolated:
// Sessions include account_id in the key
cache.session_set(&account_id, &session_data).await?;
// Agent status is account-scoped
cache.agent_status_set(&account_id, &agent_status).await?;
// Queries are account-filtered
let agents = cache.agents_get_available(&account_id).await?;
Uses SessionData struct with automatic expiry:
pub struct SessionData {
pub session_id: String,
pub user_id: String,
pub agent_id: String,
pub account_id: String,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
}
Complete agent state management:
pub struct AgentStatus {
pub user_id: String,
pub account_id: String,
pub agent_name: String,
pub agent_status: String,
pub registration: Option<UserRegistration>,
pub user_call_status: Option<UserCallStatus>,
}
Maps WebSocket connections to accounts for targeted messaging:
// Track connection
cache.add_connected_agent(&account_id, &user_id, &agent_id, &ws_conn_id).await?;
// Get all connections for an account
let connections = cache.get_account_connections(&account_id).await?;
use cal_redis::{CallableCache, RedisHelpers};
// Initialize cache
let cache = CallableCache::new().await;
// Get user (cache-first)
let user = cache.get_user_by_id("user123").await?;
// Create session
let session = cache.create_session(&account_id, &user_id, &agent_id, 3600).await?;
// Update agent status
let status = AgentStatus { /* ... */ };
cache.set_agent_status(&account_id, &status).await?;
// Queue operations
cache.queue_add(&account_id, "support", "ticket123", 1.0).await?;
let next = cache.queue_pop(&account_id, "support").await?;
// Batch operations
let users = cache.get_users_batch(&["id1", "id2", "id3"]).await?;
// Direct Redis for custom operations
let mut con = cache.redis_connection();
con.xadd("custom:stream", "*", &[("data", "value")]).await?;
// Account-specific pub/sub
cache.publish_account_event(&account_id, &event).await?;
# Required
CAL_VALKEY_HOST=redis://localhost:6379
# Optional
RUST_LOG=cal_redis=debug
All operations return Result<T, RedisError> with proper error conversion:
When migrating from direct Redis usage:
When adding new features: