influxdb-stream

Crates.ioinfluxdb-stream
lib.rsinfluxdb-stream
version0.1.1
created_at2025-12-23 14:55:12.672631+00
updated_at2025-12-23 15:02:27.247263+00
descriptionAsync streaming client for InfluxDB 2.x - query millions of rows without OOM
homepage
repositoryhttps://github.com/almightychang/influxdb-stream
max_upload_size
id2001647
size208,540
almightychang (almightychang)

documentation

README

influxdb-stream

Crates.io Documentation License: MIT

Async streaming client for InfluxDB 2.x — Query millions of time-series rows without running out of memory.

💡 Other Rust clients are async but load all results into Vec<T>. This crate is the first to offer true record-by-record streaming.

The Problem

Existing Rust InfluxDB clients load entire query results into memory:

// Using existing crates - this will OOM with large datasets!
let results: Vec<MyData> = client.query(query).await?;

When you're dealing with time-series data spanning months or years, with millions of data points, this approach simply doesn't work.

The Solution

influxdb-stream streams results one record at a time:

// Process millions of rows with constant memory usage
let mut stream = client.query_stream(query).await?;
while let Some(record) = stream.next().await {
    process(record?);
}

Comparison with Existing Crates

Crate InfluxDB Version Async Streaming Memory Efficient
influxdb 1.x
influxdb2 2.x
influx_db_client 1.x/2.x
influxdb-stream 2.x

All crates are async, but return Vec<T>. Only influxdb-stream streams record-by-record.

Benchmark Results

Measured on MacBook Pro (M-series), InfluxDB 2.7 running locally.

Throughput Comparison: influxdb-stream vs influxdb2

Records influxdb-stream influxdb2 Improvement
1,000 166K rec/s 132K rec/s +26%
5,000 387K rec/s 276K rec/s +40%
10,000 456K rec/s 334K rec/s +37%

Memory Usage Comparison (System RSS)

Records influxdb-stream influxdb2 Ratio Memory Saved
10,000 0.95 MB 13.03 MB 13.7x 92.7%
50,000 0.23 MB 46.88 MB 200x 99.5%
100,000 0.34 MB 71.69 MB 208x 99.5%
200,000 0.06 MB 137.84 MB 2205x ~100%

Key advantages:

  • 26-40% faster than influxdb2 crate
  • O(1) memory: Constant memory regardless of result set size
  • 200-2000x less memory for large datasets
  • Streaming mode: Process records without storing them (influxdb2 can't do this)

Run benchmarks yourself:

cargo bench --bench comparison              # Throughput comparison
cargo run --release --example memory_comparison  # Memory comparison

Installation

Add to your Cargo.toml:

[dependencies]
influxdb-stream = "0.1"
futures = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

Quick Start

use influxdb_stream::Client;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a client
    let client = Client::new(
        "http://localhost:8086",
        "my-org",
        "my-token"
    );

    // Execute a streaming query
    let mut stream = client.query_stream(r#"
        from(bucket: "sensors")
        |> range(start: -30d)
        |> filter(fn: (r) => r._measurement == "temperature")
    "#).await?;

    // Process records one at a time
    let mut count = 0;
    while let Some(record) = stream.next().await {
        let record = record?;
        println!(
            "{}: {} = {}",
            record.time().map(|t| t.to_rfc3339()).unwrap_or_default(),
            record.field().unwrap_or_default(),
            record.value().map(|v| v.to_string()).unwrap_or_default()
        );
        count += 1;
    }
    println!("Processed {} records", count);

    Ok(())
}

API Overview

Client

// Create with default HTTP client
let client = Client::new(url, org, token);

// Create with custom reqwest client (for timeouts, proxies, etc.)
let http = reqwest::Client::builder()
    .timeout(Duration::from_secs(300))
    .build()?;
let client = Client::with_http_client(http, url, org, token);

Streaming Query

// Returns a Stream of Result<FluxRecord>
let mut stream = client.query_stream("from(bucket: \"test\") |> range(start: -1h)").await?;

while let Some(result) = stream.next().await {
    let record: FluxRecord = result?;
    // Process record...
}

Collect All (for small datasets)

// Warning: loads all results into memory
let records: Vec<FluxRecord> = client.query("...").await?;

FluxRecord

let record: FluxRecord = ...;

// Common accessors
record.time()         // Option<&DateTime<FixedOffset>>
record.measurement()  // Option<String>
record.field()        // Option<String>
record.value()        // Option<&Value>

// Generic accessors
record.get("column_name")        // Option<&Value>
record.get_string("tag_name")    // Option<String>
record.get_double("_value")      // Option<f64>
record.get_long("_value")        // Option<i64>
record.get_bool("flag")          // Option<bool>

Value Types

All InfluxDB data types are supported:

InfluxDB Type Rust Type
string Value::String(String)
double Value::Double(OrderedFloat<f64>)
boolean Value::Bool(bool)
long Value::Long(i64)
unsignedLong Value::UnsignedLong(u64)
duration Value::Duration(chrono::Duration)
base64Binary Value::Base64Binary(Vec<u8>)
dateTime:RFC3339 Value::TimeRFC(DateTime<FixedOffset>)

Use Cases

Data Migration

let mut stream = source_client.query_stream(r#"
    from(bucket: "old-bucket")
    |> range(start: 2020-01-01T00:00:00Z, stop: 2024-01-01T00:00:00Z)
"#).await?;

while let Some(record) = stream.next().await {
    let record = record?;
    let point = transform_to_line_protocol(&record);
    dest_client.write(&point).await?;
}

ETL Pipeline

let mut stream = client.query_stream("...").await?;

while let Some(record) = stream.next().await {
    let record = record?;
    let transformed = transform(record);
    kafka_producer.send(transformed).await?;
}

Real-time Analytics

let mut stream = client.query_stream(r#"
    from(bucket: "metrics")
    |> range(start: -5m)
    |> aggregateWindow(every: 1s, fn: mean)
"#).await?;

let mut stats = RollingStats::new();
while let Some(record) = stream.next().await {
    stats.update(record?.get_double("_value").unwrap_or(0.0));
}
println!("Mean: {}, Std: {}", stats.mean(), stats.std());

How It Works

InfluxDB HTTP API (/api/v2/query)
        ↓ (Accept: application/csv)
    reqwest bytes_stream()
        ↓
    tokio_util::StreamReader
        ↓
    Annotated CSV State Machine Parser
        ↓
    Stream<Item = Result<FluxRecord>>

The parser processes InfluxDB's annotated CSV format line by line, never buffering more than a single row at a time.

Roadmap

  • v0.1.0 - Query Streaming

    • Flux query execution with streaming results
    • Memory-efficient processing
    • All InfluxDB data types
    • Comprehensive error handling
  • v0.2.0 - Write Streaming

    • Stream<Item = DataPoint> → InfluxDB streaming write
    • Line Protocol conversion with chunked transfer
    • Backpressure handling
  • v0.3.0 - Arrow Flight (InfluxDB 3.x)

    • Arrow Flight SQL protocol support
    • RecordBatch streaming
    • Feature flags for v2/v3

Requirements

  • Rust 1.85+ (Edition 2024)
  • InfluxDB 2.x server

Development

Running Tests

Tests require a running InfluxDB instance. Use Docker Compose to start one:

# Start InfluxDB
docker-compose up -d

# Run all tests (unit + integration)
./scripts/test-local.sh

# Or run manually:
cargo test --lib                    # Unit tests only
cargo test --test integration       # Integration tests only

Running Benchmarks

Benchmarks measure streaming performance with various dataset sizes:

# Run benchmarks
./scripts/bench-local.sh

# Or run manually:
cargo bench

# Results are saved to target/criterion/
# Open target/criterion/report/index.html for graphs

Test Coverage

Category Tests Description
Unit 5 Value parsing for each data type
Integration 8 End-to-end with real InfluxDB
Large Dataset 2 10K and 100K record streaming
Benchmark 3 Throughput, memory, latency

License

This project is licensed under the MIT License.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Acknowledgments

This project has been battle-tested in production, processing billions of time-series data points for industrial IoT applications.

Commit count: 0

cargo fmt