| Crates.io | influxdb-stream |
| lib.rs | influxdb-stream |
| version | 0.1.1 |
| created_at | 2025-12-23 14:55:12.672631+00 |
| updated_at | 2025-12-23 15:02:27.247263+00 |
| description | Async streaming client for InfluxDB 2.x - query millions of rows without OOM |
| homepage | |
| repository | https://github.com/almightychang/influxdb-stream |
| max_upload_size | |
| id | 2001647 |
| size | 208,540 |
Async streaming client for InfluxDB 2.x — Query millions of time-series rows without running out of memory.
💡 Other Rust clients are async but load all results into
Vec<T>. This crate is the first to offer true record-by-record streaming.
Existing Rust InfluxDB clients load entire query results into memory:
// Using existing crates - this will OOM with large datasets!
let results: Vec<MyData> = client.query(query).await?;
When you're dealing with time-series data spanning months or years, with millions of data points, this approach simply doesn't work.
influxdb-stream streams results one record at a time:
// Process millions of rows with constant memory usage
let mut stream = client.query_stream(query).await?;
while let Some(record) = stream.next().await {
process(record?);
}
| Crate | InfluxDB Version | Async | Streaming | Memory Efficient |
|---|---|---|---|---|
influxdb |
1.x | ✅ | ❌ | ❌ |
influxdb2 |
2.x | ✅ | ❌ | ❌ |
influx_db_client |
1.x/2.x | ✅ | ❌ | ❌ |
influxdb-stream |
2.x | ✅ | ✅ | ✅ |
All crates are async, but return
Vec<T>. Onlyinfluxdb-streamstreams record-by-record.
Measured on MacBook Pro (M-series), InfluxDB 2.7 running locally.
| Records | influxdb-stream | influxdb2 | Improvement |
|---|---|---|---|
| 1,000 | 166K rec/s | 132K rec/s | +26% |
| 5,000 | 387K rec/s | 276K rec/s | +40% |
| 10,000 | 456K rec/s | 334K rec/s | +37% |
| Records | influxdb-stream | influxdb2 | Ratio | Memory Saved |
|---|---|---|---|---|
| 10,000 | 0.95 MB | 13.03 MB | 13.7x | 92.7% |
| 50,000 | 0.23 MB | 46.88 MB | 200x | 99.5% |
| 100,000 | 0.34 MB | 71.69 MB | 208x | 99.5% |
| 200,000 | 0.06 MB | 137.84 MB | 2205x | ~100% |
Key advantages:
Run benchmarks yourself:
cargo bench --bench comparison # Throughput comparison
cargo run --release --example memory_comparison # Memory comparison
Add to your Cargo.toml:
[dependencies]
influxdb-stream = "0.1"
futures = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
use influxdb_stream::Client;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a client
let client = Client::new(
"http://localhost:8086",
"my-org",
"my-token"
);
// Execute a streaming query
let mut stream = client.query_stream(r#"
from(bucket: "sensors")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "temperature")
"#).await?;
// Process records one at a time
let mut count = 0;
while let Some(record) = stream.next().await {
let record = record?;
println!(
"{}: {} = {}",
record.time().map(|t| t.to_rfc3339()).unwrap_or_default(),
record.field().unwrap_or_default(),
record.value().map(|v| v.to_string()).unwrap_or_default()
);
count += 1;
}
println!("Processed {} records", count);
Ok(())
}
// Create with default HTTP client
let client = Client::new(url, org, token);
// Create with custom reqwest client (for timeouts, proxies, etc.)
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(300))
.build()?;
let client = Client::with_http_client(http, url, org, token);
// Returns a Stream of Result<FluxRecord>
let mut stream = client.query_stream("from(bucket: \"test\") |> range(start: -1h)").await?;
while let Some(result) = stream.next().await {
let record: FluxRecord = result?;
// Process record...
}
// Warning: loads all results into memory
let records: Vec<FluxRecord> = client.query("...").await?;
let record: FluxRecord = ...;
// Common accessors
record.time() // Option<&DateTime<FixedOffset>>
record.measurement() // Option<String>
record.field() // Option<String>
record.value() // Option<&Value>
// Generic accessors
record.get("column_name") // Option<&Value>
record.get_string("tag_name") // Option<String>
record.get_double("_value") // Option<f64>
record.get_long("_value") // Option<i64>
record.get_bool("flag") // Option<bool>
All InfluxDB data types are supported:
| InfluxDB Type | Rust Type |
|---|---|
string |
Value::String(String) |
double |
Value::Double(OrderedFloat<f64>) |
boolean |
Value::Bool(bool) |
long |
Value::Long(i64) |
unsignedLong |
Value::UnsignedLong(u64) |
duration |
Value::Duration(chrono::Duration) |
base64Binary |
Value::Base64Binary(Vec<u8>) |
dateTime:RFC3339 |
Value::TimeRFC(DateTime<FixedOffset>) |
let mut stream = source_client.query_stream(r#"
from(bucket: "old-bucket")
|> range(start: 2020-01-01T00:00:00Z, stop: 2024-01-01T00:00:00Z)
"#).await?;
while let Some(record) = stream.next().await {
let record = record?;
let point = transform_to_line_protocol(&record);
dest_client.write(&point).await?;
}
let mut stream = client.query_stream("...").await?;
while let Some(record) = stream.next().await {
let record = record?;
let transformed = transform(record);
kafka_producer.send(transformed).await?;
}
let mut stream = client.query_stream(r#"
from(bucket: "metrics")
|> range(start: -5m)
|> aggregateWindow(every: 1s, fn: mean)
"#).await?;
let mut stats = RollingStats::new();
while let Some(record) = stream.next().await {
stats.update(record?.get_double("_value").unwrap_or(0.0));
}
println!("Mean: {}, Std: {}", stats.mean(), stats.std());
InfluxDB HTTP API (/api/v2/query)
↓ (Accept: application/csv)
reqwest bytes_stream()
↓
tokio_util::StreamReader
↓
Annotated CSV State Machine Parser
↓
Stream<Item = Result<FluxRecord>>
The parser processes InfluxDB's annotated CSV format line by line, never buffering more than a single row at a time.
v0.1.0 - Query Streaming
v0.2.0 - Write Streaming
Stream<Item = DataPoint> → InfluxDB streaming writev0.3.0 - Arrow Flight (InfluxDB 3.x)
RecordBatch streamingTests require a running InfluxDB instance. Use Docker Compose to start one:
# Start InfluxDB
docker-compose up -d
# Run all tests (unit + integration)
./scripts/test-local.sh
# Or run manually:
cargo test --lib # Unit tests only
cargo test --test integration # Integration tests only
Benchmarks measure streaming performance with various dataset sizes:
# Run benchmarks
./scripts/bench-local.sh
# Or run manually:
cargo bench
# Results are saved to target/criterion/
# Open target/criterion/report/index.html for graphs
| Category | Tests | Description |
|---|---|---|
| Unit | 5 | Value parsing for each data type |
| Integration | 8 | End-to-end with real InfluxDB |
| Large Dataset | 2 | 10K and 100K record streaming |
| Benchmark | 3 | Throughput, memory, latency |
This project is licensed under the MIT License.
Contributions are welcome! Please feel free to submit a Pull Request.
git checkout -b feature/amazing-feature)git commit -m 'Add some amazing feature')git push origin feature/amazing-feature)This project has been battle-tested in production, processing billions of time-series data points for industrial IoT applications.