| Crates.io | rigatoni-stores |
| lib.rs | rigatoni-stores |
| version | 0.2.0 |
| created_at | 2025-11-22 13:23:21.582017+00 |
| updated_at | 2025-12-12 12:50:45.404557+00 |
| description | State store implementations for Rigatoni CDC/Data Replication: Memory, File, Redis for distributed state management |
| homepage | https://github.com/valeriouberti/rigatoni |
| repository | https://github.com/valeriouberti/rigatoni |
| max_upload_size | |
| id | 1945327 |
| size | 179,479 |
State store implementations for Rigatoni ETL framework - persist resume tokens for fault tolerance.
State store implementations for persisting MongoDB change stream resume tokens, enabling fault-tolerant ETL pipelines with exactly-once or at-least-once semantics.
Arc<RwLock<HashMap>>[dependencies]
rigatoni-stores = { version = "0.2.0", features = ["memory", "file", "redis-store"] }
memory - In-memory store (enabled by default)file - File-based store (enabled by default)redis-store - Redis store with connection pooling and retry logicall-stores - All store implementationsuse rigatoni_stores::memory::MemoryStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = MemoryStore::new();
// Use with Rigatoni pipeline
// let pipeline = Pipeline::with_store(config, destination, store).await?;
Ok(())
}
use rigatoni_stores::file::FileStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Stores resume tokens in ./state/ directory
let store = FileStore::new("./state").await?;
// Use with Rigatoni pipeline
// let pipeline = Pipeline::with_store(config, destination, store).await?;
Ok(())
}
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Redis with connection pooling and TTL
let config = RedisConfig::builder()
.url("redis://localhost:6379")
.pool_size(10)
.ttl(Duration::from_secs(7 * 24 * 60 * 60)) // 7 days
.max_retries(3)
.build()?;
let store = RedisStore::new(config).await?;
// Use with Rigatoni pipeline for distributed state
// let pipeline = Pipeline::with_store(config, destination, store).await?;
Ok(())
}
url - Redis connection URL (supports redis:// and rediss:// schemes)pool_size - Connection pool size (default: 10)ttl - Optional expiration time for resume tokens (recommended: 7-30 days)max_retries - Maximum retry attempts for transient errors (default: 3)connection_timeout - Connection timeout duration (default: 5 seconds)Note: Redis Cluster mode is not currently implemented. Use Redis Sentinel for high availability.
All stores implement the StateStore trait from rigatoni-core:
use rigatoni_core::store::StateStore;
#[async_trait]
pub trait StateStore: Send + Sync {
/// Save a resume token for a collection
async fn save_resume_token(&self, collection: &str, token: Document)
-> Result<(), StateError>;
/// Load a resume token for a collection
async fn load_resume_token(&self, collection: &str)
-> Result<Option<Document>, StateError>;
/// Clear a resume token for a collection
async fn clear_resume_token(&self, collection: &str)
-> Result<(), StateError>;
}
Implement your own store for custom backends:
use rigatoni_core::store::{StateStore, StateError};
use async_trait::async_trait;
use bson::Document;
pub struct CustomStore {
// Your storage backend
}
#[async_trait]
impl StateStore for CustomStore {
async fn save_resume_token(&self, collection: &str, token: Document)
-> Result<(), StateError>
{
// Your implementation
Ok(())
}
async fn load_resume_token(&self, collection: &str)
-> Result<Option<Document>, StateError>
{
// Your implementation
Ok(None)
}
async fn clear_resume_token(&self, collection: &str)
-> Result<(), StateError>
{
// Your implementation
Ok(())
}
}
Use Memory Store for fast iteration without persistence
Use File Store for simple, reliable persistence
Use Redis Store for distributed state across pipeline instances with:
See the Multi-Instance Deployment Guide for Kubernetes examples and configuration.
Licensed under the Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0).