Crates.io | langfuse-ergonomic |
lib.rs | langfuse-ergonomic |
version | 0.3.0 |
created_at | 2025-08-28 21:29:36.794827+00 |
updated_at | 2025-08-30 22:13:51.966959+00 |
description | Ergonomic Rust client for Langfuse with builder patterns |
homepage | https://github.com/genai-rs/langfuse-ergonomic |
repository | https://github.com/genai-rs/langfuse-ergonomic |
max_upload_size | |
id | 1814695 |
size | 348,629 |
Ergonomic Rust client for Langfuse, the open-source LLM observability platform.
[dependencies]
langfuse-ergonomic = "*"
tokio = { version = "1", features = ["full"] }
serde_json = "1"
[dependencies]
langfuse-ergonomic = { version = "*", features = ["compression"] }
compression
- Enable gzip, brotli, and deflate compression for requests (reduces bandwidth usage)use langfuse_ergonomic::LangfuseClient;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client from environment variables
let client = LangfuseClient::from_env()?;
// Create a trace
let trace = client.trace()
.name("my-application")
.input(json!({"query": "Hello, world!"}))
.output(json!({"response": "Hi there!"}))
.user_id("user-123")
.tags(["production", "chat"])
.call()
.await?;
println!("Created trace: {}", trace.id);
// Fetch and list traces
let fetched_trace = client.get_trace(&trace.id).await?;
let traces = client.list_traces()
.limit(10)
.user_id("user-123")
.call()
.await?;
// Create a dataset
let dataset = client.create_dataset()
.name("my-dataset")
.description("Example dataset")
.call()
.await?;
Ok(())
}
Set these environment variables:
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_BASE_URL=https://cloud.langfuse.com # Optional
Or configure explicitly with advanced options:
use std::time::Duration;
let client = LangfuseClient::builder()
.public_key("pk-lf-...")
.secret_key("sk-lf-...")
.base_url("https://cloud.langfuse.com")
.timeout(Duration::from_secs(30)) // Custom timeout
.connect_timeout(Duration::from_secs(5)) // Connection timeout
.user_agent("my-app/1.0.0") // Custom user agent
.build();
Check the examples/
directory for more usage examples:
# Trace examples
cargo run --example basic_trace
cargo run --example trace_with_metadata
cargo run --example multiple_traces
# Trace fetching and management
cargo run --example traces_fetch
# Observations (spans, generations, events)
cargo run --example observations
# Scoring and evaluation
cargo run --example scores
# Dataset management
cargo run --example datasets
# Prompt management
cargo run --example prompts
# Batch processing
cargo run --example batch_ingestion
# Self-hosted configuration
cargo run --example self_hosted
The client supports efficient batch processing with automatic chunking, retry logic, and comprehensive error handling:
use langfuse_ergonomic::{Batcher, BackpressurePolicy, LangfuseClient};
use std::time::Duration;
let client = LangfuseClient::from_env()?;
// Create a batcher with custom configuration
let batcher = Batcher::builder()
.client(client)
.max_events(50) // Events per batch (default: 100)
.max_bytes(2_000_000) // Max batch size in bytes (default: 3.5MB)
.flush_interval(Duration::from_secs(10)) // Auto-flush interval (default: 5s)
.max_retries(5) // Retry attempts (default: 3)
.max_queue_size(5000) // Max events to queue (default: 10,000)
.backpressure_policy(BackpressurePolicy::DropNew) // What to do when queue is full
.build()
.await;
// Add events - they'll be automatically batched
for event in events {
batcher.add(event).await?;
}
// Manual flush if needed
let response = batcher.flush().await?;
println!("Sent {} events", response.success_count);
// Monitor metrics
let metrics = batcher.metrics();
println!("Queued: {}, Flushed: {}, Failed: {}, Dropped: {}",
metrics.queued, metrics.flushed, metrics.failed, metrics.dropped);
// Graceful shutdown (flushes remaining events)
let final_response = batcher.shutdown().await?;
207 Multi-Status Handling: Automatically handles partial failures where some events succeed and others fail.
Backpressure Policies:
Block
: Wait when queue is full (default)DropNew
: Drop new events when queue is fullDropOldest
: Remove oldest events to make roomMetrics & Monitoring:
let metrics = batcher.metrics();
// Available metrics:
// - queued: Current events waiting to be sent
// - flushed: Total successfully sent
// - failed: Total failed after all retries
// - dropped: Total dropped due to backpressure
// - retries: Total retry attempts
// - last_error_ts: Unix timestamp of last error
Error Handling:
match batcher.flush().await {
Ok(response) => {
println!("Success: {}, Failed: {}",
response.success_count, response.failure_count);
}
Err(Error::PartialFailure { success_count, failure_count, errors, .. }) => {
println!("Partial success: {} ok, {} failed", success_count, failure_count);
for error in errors {
if error.retryable {
println!("Retryable error: {}", error.message);
}
}
}
Err(e) => eprintln!("Complete failure: {}", e),
}
compression
feature flag)Licensed under either of:
See CONTRIBUTING.md for guidelines.