| Crates.io | clicktype |
| lib.rs | clicktype |
| version | 0.2.0 |
| created_at | 2026-01-25 14:23:22.878943+00 |
| updated_at | 2026-01-25 14:23:22.878943+00 |
| description | Type-safe ClickHouse client for Rust with compile-time query validation |
| homepage | https://github.com/clicktype/clicktype |
| repository | https://github.com/clicktype/clicktype |
| max_upload_size | |
| id | 2068827 |
| size | 65,417 |
ClickType is a ClickHouse client for Rust designed for bulk data ingestion and type-safe query construction. It focuses on explicit memory control and high performance by utilizing the RowBinary format.
#[derive(ClickTable)] macro.tracing ecosystem for monitoring latencies and errors.Nullable, LowCardinality, Array, and Map.Add the dependency to your Cargo.toml file:
[dependencies]
clicktype = "0.1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
Map your ClickHouse tables to Rust structs. The macro handles serialization and DDL generation.
use clicktype::prelude::*;
#[derive(ClickTable)]
#[click_table(name = "user_events", primary_key = "user_id")]
pub struct UserEvent {
#[click_column(primary_key)]
pub user_id: u64,
pub timestamp: DateTime64<3>,
pub event_type: LowCardinality<String>,
pub properties: Map<String, String>,
pub session_id: Nullable<String>,
#[click_column(materialized = "now()")]
pub server_time: DateTime64<3>,
}
The Batcher manages the grouping of rows in memory before sending them to ClickHouse. It allows configuration of row limits, buffer sizes, and timeouts.
buffer_shrink_threshold).spawn() method returns the worker's JoinHandle to detect unexpected stops or panics.insert (blocks until capacity is available) and try_insert (fails immediately if the buffer is full).use clicktype::batch::{Batcher, BatchConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = BatchConfig {
max_rows: 50_000,
max_wait: Duration::from_secs(5),
buffer_shrink_threshold: 2 * 1024 * 1024, // 2 MB
..BatchConfig::default()
};
let client = clicktype::Client::new("http://localhost:8123");
let batcher = Batcher::<UserEvent>::new(client, config);
// The handle is used for data insertion; worker_task supervises the execution thread.
let (handle, worker_task) = batcher.spawn();
let event = UserEvent {
user_id: 1,
timestamp: DateTime64::now(),
event_type: "click".into(),
properties: [("key".into(), "val".into())].into(),
session_id: Nullable::null(),
server_time: DateTime64::default(),
};
// Insertion with backpressure
handle.insert(event).await?;
handle.close().await?;
worker_task.await?;
Ok(())
}
The QueryBuilder generates structured SQL using the column constants provided by the macro.
use clicktype::query::QueryBuilder;
async fn get_stats(client: &clicktype::Client) -> Result<(), Box<dyn std::error::Error>> {
let sql = QueryBuilder::<UserEvent>::new()
.select(UserEvent::USER_ID)
.select_raw("count() as total")
.filter(UserEvent::TIMESTAMP, ">=", "today()")
.group_by(UserEvent::USER_ID)
.limit(10)
.to_sql();
let results = client.query::<ResultRow>(&sql).await?;
Ok(())
}
ClickType uses tracing to emit events. Logs include structured data regarding:
To enable log output:
tracing_subscriber::fmt::init();
insert), distributing CPU cost and preventing latency spikes during the flush operation.ClickType validates your schema automatically to prevent data corruption.
RowBinary is ClickHouse's fastest format, but it's position-based (no column names in the wire format). ClickType protects you with comprehensive validation:
What ClickType Validates Automatically: ✅ Column order - Position mismatch detected immediately ✅ Column types - Type mismatches caught before any data is sent ✅ Column count - Missing or extra insertable columns detected ✅ Column names - Ensures struct fields match table columns
Validation happens on first insert:
let batcher = Batcher::<UserEvent>::new(client, config);
let (handle, worker) = batcher.spawn();
// Schema validation runs here - fails fast if anything is wrong:
handle.insert(event).await?;
// ✓ Order validated
// ✓ Types validated
// ✓ Count validated
Example validation error (column order mismatch):
Schema validation failed for table 'events':
Column order mismatch at position 0: struct has 'id', table has 'name'
Column order mismatch at position 1: struct has 'name', table has 'value'
Inherent RowBinary Limitations (Can't be validated):
When you need to change your schema:
// Step 1: Create new table version
#[derive(ClickTable)]
#[click_table(name = "events_v2")]
pub struct EventV2 {
pub id: u64,
pub new_field: String, // ← Added field
pub timestamp: u64,
}
// Step 2: Deploy code that writes to BOTH tables
batcher_v1.insert(old_event).await?;
batcher_v2.insert(new_event).await?;
// Step 3: Backfill data
// INSERT INTO events_v2 SELECT id, '', timestamp FROM events
// Step 4: Switch reads to v2
// Step 5: Stop writes to v1, drop old table
Alternative: Use ClickHouse ALTER TABLE for compatible changes:
-- Adding a column with DEFAULT (safe)
ALTER TABLE events ADD COLUMN new_field String DEFAULT ''
let config = BatchConfig {
max_buffer_size: 64 * 1024 * 1024, // 64 MB hard limit
buffer_shrink_threshold: 2 * 1024 * 1024, // Shrink if > 2 MB after flush
..Default::default()
};
Why This Matters:
// Data integrity priority (wait for capacity)
handle.insert(event).await?; // Blocks if channel full
// High availability priority (drop if full)
if let Err(_) = handle.try_insert(event) {
metrics.record_dropped_event();
}
let (handle, worker) = batcher.spawn();
tokio::select! {
result = worker => {
// Worker crashed - handle it!
error!("Batcher worker died: {:?}", result);
// Restart batcher, alert ops, etc.
}
_ = tokio::signal::ctrl_c() => {
handle.close().await?;
}
}
Cause: Struct field type doesn't match ClickHouse column type.
Fix:
-- Check actual ClickHouse schema
DESCRIBE TABLE your_table;
Compare with your Rust struct. Common mismatches:
i32 vs i64 (Int32 vs Int64)String vs LowCardinality<String>Option<T> vs T (Nullable vs non-Nullable)Causes:
Debugging:
use tracing_subscriber;
// Enable detailed logs
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
// Logs will show:
// - Exact HTTP error from ClickHouse
// - Retry attempts
// - Batch size and payload
Check:
buffer_shrink_threshold configured?Fix:
let config = BatchConfig {
buffer_shrink_threshold: 2 * 1024 * 1024, // Add this!
max_buffer_size: 64 * 1024 * 1024,
channel_capacity: 10_000, // Don't set too high
..Default::default()
};
Likely cause: Worker panicked due to:
Detection:
let (handle, worker) = batcher.spawn();
// Monitor worker
tokio::spawn(async move {
if let Err(e) = worker.await {
error!("Worker panicked: {:?}", e);
// Alert, restart, failover, etc.
}
});
ClickType includes extensive fuzzing tests using proptest:
cargo test -p clicktype-core --test fuzzing_tests
Validates:
Production-scale stress tests (run manually):
# 1M row insertion test
cargo test --release --test load_tests load_test_1m_rows -- --ignored --nocapture
# Burst scenario (memory management)
cargo test --release --test load_tests load_test_burst_scenario -- --ignored --nocapture
# Concurrent inserts
cargo test --release --test load_tests load_test_concurrent_inserts -- --ignored --nocapture
# Backpressure testing
cargo test --release --test load_tests load_test_try_insert_backpressure -- --ignored --nocapture