| Crates.io | universal-inserter |
| lib.rs | universal-inserter |
| version | 0.1.0 |
| created_at | 2025-12-24 20:04:37.074075+00 |
| updated_at | 2025-12-24 20:04:37.074075+00 |
| description | A runtime-agnostic batch inserter library implementing the ClickHouse Inserter pattern |
| homepage | https://github.com/avbel/universal-inserter |
| repository | https://github.com/avbel/universal-inserter |
| max_upload_size | |
| id | 2003792 |
| size | 22,691 |
A runtime-agnostic Rust library implementing the batch inserter pattern (similar to ClickHouse's Inserter). Buffers items and flushes them via a user-provided async insert function based on configurable limits.
rand for period_bias feature)[dependencies]
universal-inserter = "0.1"
# Optional: enable period bias randomization
universal-inserter = { version = "0.1", features = ["period_bias"] }
use universal_inserter::Inserter;
use std::time::Duration;
#[derive(Clone)]
struct MyRow {
id: u64,
name: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut inserter = Inserter::new(|batch: Vec<MyRow>| async move {
println!("Inserting {} rows", batch.len());
// Your insert logic here (database, API, file, etc.)
Ok::<_, std::io::Error>(())
})
.with_max_rows(1000)
.with_period(Duration::from_secs(5));
// Write rows
for i in 0..2500 {
inserter.write(&MyRow { id: i, name: format!("row_{}", i) });
inserter.commit().await?; // Flushes if limits reached
}
// Flush remaining and close
let stats = inserter.end().await?;
println!("Total: {} rows, {} transactions", stats.rows, stats.transactions);
Ok(())
}
use universal_inserter::Inserter;
use std::time::Duration;
let mut inserter = Inserter::new(insert_fn)
.with_max_rows(500)
.with_period(Duration::from_secs(10))
.with_period_bias(0.2); // ±20% randomization
use universal_inserter::Inserter;
let mut inserter = Inserter::new(insert_fn)
.with_max_rows(100)
.with_commit_callback(|stats| {
println!("Committed {} rows", stats.rows);
});
// Flush unconditionally, regardless of limits
let stats = inserter.force_commit().await?;
let pending = inserter.pending();
println!("Buffered: {} rows", pending.rows);
if let Some(time_left) = inserter.time_left() {
println!("Next flush in: {:?}", time_left);
}
| Method | Description |
|---|---|
new(insert_fn) |
Create inserter with async insert function |
with_max_rows(n) |
Set row limit (default: unlimited) |
with_period(duration) |
Set time-based flush interval |
with_period_bias(bias) |
Add randomization ±bias (requires period_bias feature) |
with_commit_callback(fn) |
Register callback after successful commits |
write(item) |
Add item to buffer (clones item) |
write_owned(item) |
Add item to buffer (moves item) |
commit() |
Check limits and flush if reached |
force_commit() |
Flush unconditionally |
end() |
Consume inserter and flush remaining |
pending() |
Get current buffer statistics |
time_left() |
Duration until next period tick |
MIT