clicktype

Crates.ioclicktype
lib.rsclicktype
version0.2.0
created_at2026-01-25 14:23:22.878943+00
updated_at2026-01-25 14:23:22.878943+00
descriptionType-safe ClickHouse client for Rust with compile-time query validation
homepagehttps://github.com/clicktype/clicktype
repositoryhttps://github.com/clicktype/clicktype
max_upload_size
id2068827
size65,417
Rafael Calderon Robles (RafaCalRob)

documentation

https://docs.rs/clicktype

README

ClickType

ClickType is a ClickHouse client for Rust designed for bulk data ingestion and type-safe query construction. It focuses on explicit memory control and high performance by utilizing the RowBinary format.

Key Features

  • Data Modeling: Schema definition via the #[derive(ClickTable)] macro.
  • Batch Ingestion: Async buffering system with active memory management and backpressure control.
  • Query Builder: Fluent API for SQL generation, validating column names and types at compile time.
  • Observability: Native integration with the tracing ecosystem for monitoring latencies and errors.
  • Complex Types: Support for Nullable, LowCardinality, Array, and Map.

Installation

Add the dependency to your Cargo.toml file:

[dependencies]
clicktype = "0.1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"

Table Definition

Map your ClickHouse tables to Rust structs. The macro handles serialization and DDL generation.

use clicktype::prelude::*;

#[derive(ClickTable)]
#[click_table(name = "user_events", primary_key = "user_id")]
pub struct UserEvent {
    #[click_column(primary_key)]
    pub user_id: u64,

    pub timestamp: DateTime64<3>,

    pub event_type: LowCardinality<String>,

    pub properties: Map<String, String>,

    pub session_id: Nullable<String>,
    
    #[click_column(materialized = "now()")]
    pub server_time: DateTime64<3>,
}

Data Ingestion (Batcher)

The Batcher manages the grouping of rows in memory before sending them to ClickHouse. It allows configuration of row limits, buffer sizes, and timeouts.

Memory and Error Management

  • Memory Release: The internal buffer is automatically reduced after a flush if it exceeds the configured threshold (buffer_shrink_threshold).
  • Supervision: The spawn() method returns the worker's JoinHandle to detect unexpected stops or panics.
  • Backpressure Strategies: Supports insert (blocks until capacity is available) and try_insert (fails immediately if the buffer is full).

Usage Example

use clicktype::batch::{Batcher, BatchConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = BatchConfig {
        max_rows: 50_000,
        max_wait: Duration::from_secs(5),
        buffer_shrink_threshold: 2 * 1024 * 1024, // 2 MB
        ..BatchConfig::default()
    };

    let client = clicktype::Client::new("http://localhost:8123");
    let batcher = Batcher::<UserEvent>::new(client, config);
    
    // The handle is used for data insertion; worker_task supervises the execution thread.
    let (handle, worker_task) = batcher.spawn();

    let event = UserEvent {
        user_id: 1,
        timestamp: DateTime64::now(),
        event_type: "click".into(),
        properties: [("key".into(), "val".into())].into(),
        session_id: Nullable::null(),
        server_time: DateTime64::default(),
    };

    // Insertion with backpressure
    handle.insert(event).await?;

    handle.close().await?;
    worker_task.await?;

    Ok(())
}

Querying

The QueryBuilder generates structured SQL using the column constants provided by the macro.

use clicktype::query::QueryBuilder;

async fn get_stats(client: &clicktype::Client) -> Result<(), Box<dyn std::error::Error>> {
    let sql = QueryBuilder::<UserEvent>::new()
        .select(UserEvent::USER_ID)
        .select_raw("count() as total")
        .filter(UserEvent::TIMESTAMP, ">=", "today()")
        .group_by(UserEvent::USER_ID)
        .limit(10)
        .to_sql();
    
    let results = client.query::<ResultRow>(&sql).await?;
    Ok(())
}

Observability

ClickType uses tracing to emit events. Logs include structured data regarding:

  • Number of rows and bytes sent per batch.
  • Latency of HTTP requests to ClickHouse.
  • Retries performed during network failures.

To enable log output:

tracing_subscriber::fmt::init();

Implementation Details

  1. RowBinary Protocol: RowBinary is used for all insertions as it is the most efficient format in terms of CPU and bandwidth.
  2. Incremental Serialization: Data is serialized to the buffer at the time of insertion (insert), distributing CPU cost and preventing latency spikes during the flush operation.
  3. Buffer Protection: If a batch persistently fails after all configured retries, the buffer is cleared to prevent a total deadlock of the ingestion process.
  4. Schema Validation: Automatic schema validation on first insert prevents silent data corruption from schema mismatches.

Production Considerations

Automatic Schema Protection

ClickType validates your schema automatically to prevent data corruption.

RowBinary is ClickHouse's fastest format, but it's position-based (no column names in the wire format). ClickType protects you with comprehensive validation:

What ClickType Validates Automatically:Column order - Position mismatch detected immediately ✅ Column types - Type mismatches caught before any data is sent ✅ Column count - Missing or extra insertable columns detected ✅ Column names - Ensures struct fields match table columns

Validation happens on first insert:

let batcher = Batcher::<UserEvent>::new(client, config);
let (handle, worker) = batcher.spawn();

// Schema validation runs here - fails fast if anything is wrong:
handle.insert(event).await?;
// ✓ Order validated
// ✓ Types validated
// ✓ Count validated

Example validation error (column order mismatch):

Schema validation failed for table 'events':
Column order mismatch at position 0: struct has 'id', table has 'name'
Column order mismatch at position 1: struct has 'name', table has 'value'

Inherent RowBinary Limitations (Can't be validated):

  • ⚠️ MATERIALIZED/ALIAS expressions (not insertable, skipped)
  • ⚠️ DEFAULT expressions (server-side, not visible in schema)
  • ⚠️ Codec settings (compression, internal)

Best Practices

1. Schema Changes - Use Migrations

When you need to change your schema:

// Step 1: Create new table version
#[derive(ClickTable)]
#[click_table(name = "events_v2")]
pub struct EventV2 {
    pub id: u64,
    pub new_field: String,  // ← Added field
    pub timestamp: u64,
}

// Step 2: Deploy code that writes to BOTH tables
batcher_v1.insert(old_event).await?;
batcher_v2.insert(new_event).await?;

// Step 3: Backfill data
// INSERT INTO events_v2 SELECT id, '', timestamp FROM events

// Step 4: Switch reads to v2

// Step 5: Stop writes to v1, drop old table

Alternative: Use ClickHouse ALTER TABLE for compatible changes:

-- Adding a column with DEFAULT (safe)
ALTER TABLE events ADD COLUMN new_field String DEFAULT ''

2. Monitor Buffer Memory

let config = BatchConfig {
    max_buffer_size: 64 * 1024 * 1024,        // 64 MB hard limit
    buffer_shrink_threshold: 2 * 1024 * 1024, // Shrink if > 2 MB after flush
    ..Default::default()
};

Why This Matters:

  • Traffic spike → 64 MB buffer allocated
  • Without shrink threshold → buffer stays 64 MB forever (memory leak!)
  • With shrink threshold → buffer returns to 1 MB after spike

3. Choose Backpressure Strategy

// Data integrity priority (wait for capacity)
handle.insert(event).await?;  // Blocks if channel full

// High availability priority (drop if full)
if let Err(_) = handle.try_insert(event) {
    metrics.record_dropped_event();
}

4. Supervise Worker Task

let (handle, worker) = batcher.spawn();

tokio::select! {
    result = worker => {
        // Worker crashed - handle it!
        error!("Batcher worker died: {:?}", result);
        // Restart batcher, alert ops, etc.
    }
    _ = tokio::signal::ctrl_c() => {
        handle.close().await?;
    }
}

Troubleshooting

"Schema validation failed: type mismatch"

Cause: Struct field type doesn't match ClickHouse column type.

Fix:

-- Check actual ClickHouse schema
DESCRIBE TABLE your_table;

Compare with your Rust struct. Common mismatches:

  • i32 vs i64 (Int32 vs Int64)
  • String vs LowCardinality<String>
  • Option<T> vs T (Nullable vs non-Nullable)

"Insert failed after 3 retries"

Causes:

  1. Network issues
  2. ClickHouse server overload
  3. Quota/permissions
  4. Invalid data (e.g., duplicate primary key)

Debugging:

use tracing_subscriber;

// Enable detailed logs
tracing_subscriber::fmt()
    .with_max_level(tracing::Level::DEBUG)
    .init();

// Logs will show:
// - Exact HTTP error from ClickHouse
// - Retry attempts
// - Batch size and payload

Memory keeps growing

Check:

  1. Is buffer_shrink_threshold configured?
  2. Are flushes completing successfully?
  3. Is the channel capacity too large?

Fix:

let config = BatchConfig {
    buffer_shrink_threshold: 2 * 1024 * 1024,  // Add this!
    max_buffer_size: 64 * 1024 * 1024,
    channel_capacity: 10_000,  // Don't set too high
    ..Default::default()
};

Worker stops processing

Likely cause: Worker panicked due to:

  1. Schema validation failure (fixed in v0.1+)
  2. Network error during flush
  3. Out of memory

Detection:

let (handle, worker) = batcher.spawn();

// Monitor worker
tokio::spawn(async move {
    if let Err(e) = worker.await {
        error!("Worker panicked: {:?}", e);
        // Alert, restart, failover, etc.
    }
});

Testing

Property-Based Tests

ClickType includes extensive fuzzing tests using proptest:

cargo test -p clicktype-core --test fuzzing_tests

Validates:

  • Roundtrip serialization for all types
  • Edge cases (NaN, infinity, empty strings, null bytes, max values)
  • Large data (1MB strings, 100k element arrays)

Load Tests

Production-scale stress tests (run manually):

# 1M row insertion test
cargo test --release --test load_tests load_test_1m_rows -- --ignored --nocapture

# Burst scenario (memory management)
cargo test --release --test load_tests load_test_burst_scenario -- --ignored --nocapture

# Concurrent inserts
cargo test --release --test load_tests load_test_concurrent_inserts -- --ignored --nocapture

# Backpressure testing
cargo test --release --test load_tests load_test_try_insert_backpressure -- --ignored --nocapture

Commit count: 0

cargo fmt