| Crates.io | caudal-sdk |
| lib.rs | caudal-sdk |
| version | 0.1.0 |
| created_at | 2026-01-18 23:26:37.183953+00 |
| updated_at | 2026-01-18 23:26:37.183953+00 |
| description | Caudal SDK - Event observability for distributed systems |
| homepage | https://github.com/JesusCabreraReveles/caudal-sdk-rs |
| repository | https://github.com/JesusCabreraReveles/caudal-sdk-rs |
| max_upload_size | |
| id | 2053303 |
| size | 107,830 |
Observability SDK for event-driven distributed systems.
Caudal is an observability system for event flows. It is NOT a broker, it DOES NOT participate in the critical path, and it DOES NOT guarantee delivery. It only observes, records, and correlates events describing how a message flows through a distributed system.
Add this to your Cargo.toml:
[dependencies]
caudal-sdk = "0.1.0"
# With JSON serialization
caudal-sdk = { version = "0.1.0", features = ["serde_json"] }
# With Kafka support
caudal-sdk = { version = "0.1.0", features = ["kafka"] }
# With async support
caudal-sdk = { version = "0.1.0", features = ["async-runtime"] }
# All features
caudal-sdk = { version = "0.1.0", features = ["serde_json", "kafka", "async-runtime"] }
use caudal_sdk::{
FlowEventBuilder, CaudalEmitter,
NodeType, FlowStage, FlowStatus, AckType,
};
// Create an event (flow_id is required and represents the logical unit)
let event = FlowEventBuilder::new("siscom-consumer")
.group_flow("siscom") // Optional: grouping for Realtime/UI
.node("api-gateway", NodeType::Producer)
.stage(FlowStage::Emit)
.status(FlowStatus::Sent)
.ack(AckType::Implicit)
.payload_bytes(190) // Optional: Logical size
.input_bytes(190) // Optional: Input bytes
.output_bytes(210) // Optional: Output bytes
.meta("endpoint", "/api/users")
.build()?;
// event_id is generated automatically as UUID v4 if not specified
// Emit with custom transport
let emitter = CaudalEmitter::new(my_transport);
emitter.emit_strict(event)?;
// Or best-effort mode (never fails)
emitter.emit_best_effort(event);
use caudal_sdk::BufferedEmitter;
use caudal_sdk::emitter::buffered::{BufferedConfig, DropPolicy};
let config = BufferedConfig {
buffer_size: 1000,
drop_policy: DropPolicy::DropNewest,
flush_interval: None,
};
let emitter = BufferedEmitter::new(transport, config);
// Emit without blocking (never fails)
emitter.emit_best_effort(event);
// Get metrics
let metrics = emitter.metrics();
println!("Sent: {}, Dropped: {}", metrics.events_sent, metrics.events_dropped);
use caudal_sdk::transport::kafka::{KafkaTransport, KafkaConfig};
let config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
topic: "caudal-events".to_string(),
acks: "all".to_string(),
..Default::default()
};
let transport = KafkaTransport::new(config)?;
let emitter = CaudalEmitter::new(transport);
| Feature | Description | Dependencies |
|---|---|---|
serde_json |
JSON serialization of events | serde_json |
kafka |
Kafka/Redpanda transport | rdkafka, tokio |
async-runtime |
async/await support | async-trait, tokio |
tracing |
Integration with tracing | tracing |
Each event includes:
pub struct FlowEvent {
pub spec_version: String, // "caudal.v1"
pub event_id: Uuid, // Unique event ID (tracing)
pub flow_id: String, // Flow ID (logical unit)
pub group_flow: Option<String>, // Logical grouping (realtime/UI)
pub node_id: String, // Emitting node ID
pub node_type: NodeType, // Producer, Consumer, Bridge, etc.
pub timestamp: DateTime<Utc>, // UTC Timestamp
pub stage: FlowStage, // Ingress, Emit, Process, etc.
pub status: FlowStatus, // Sent, Received, Ack, etc.
pub ack: AckInfo, // Acknowledgment information
pub metadata: EventMetadata, // Structured metadata (see below)
pub error: Option<FlowError>, // Error information
}
pub struct EventMetadata {
pub event_id: Option<Uuid>, // Event ID (redundant)
pub bytes: Option<String>, // Legacy bytes (string)
pub payload_bytes: Option<u64>, // Logical size
pub input_bytes: Option<u64>, // Input bytes
pub output_bytes: Option<u64>, // Output bytes
pub extra: HashMap<String, String>, // Other dynamic fields
}
Producer, Bridge, Consumer, Storage, ExternalIngress, Emit, Bridge, Consume, Process, Persist, Egress, ErrorSent, Received, Ack, Nack, Timeout, SkippedNone, Implicit, Explicit, BridgeApplications SHOULD:
flow_id across all events in a single flow for proper correlation.let flow_id = "order-processing";
let group = "siscom";
// 1. Ingress event
let ingress = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("api-gateway", NodeType::Producer)
.stage(FlowStage::Ingress)
.status(FlowStatus::Received)
.build()?;
// 2. Processing event
let processing = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("order-service", NodeType::Consumer)
.stage(FlowStage::Process)
.status(FlowStatus::Received)
.build()?;
// 3. External ACK event
let ack = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("payment-gateway", NodeType::External)
.stage(FlowStage::Egress)
.status(FlowStatus::Ack)
.ack(AckType::Explicit)
.build()?;
// 4. Final "processed" event
let processed = FlowEventBuilder::new(flow_id)
.group_flow(group)
.node("order-service", NodeType::Consumer)
.stage(FlowStage::Process)
.status(FlowStatus::Ack)
.meta("order_id", "12345")
.build()?;
use caudal_sdk::{CaudalTransport, FlowEvent};
use anyhow::Result;
struct MyTransport {
// Your configuration
}
impl CaudalTransport for MyTransport {
fn emit(&self, event: FlowEvent) -> Result<()> {
// Implement event emission
// For example, to Redis, HTTP, etc.
Ok(())
}
}
let config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
topic: "caudal-events".to_string(),
acks: "all".to_string(), // "all", "1", "0"
linger_ms: Some(10), // Batching delay
batch_size: Some(16384), // Batch size in bytes
timeout_ms: 5000, // Timeout for delivery
};
"all" or "-1": Maximum durability (waits for all replicas)"1": Waits only for the leader"0": Fire-and-forget (no acknowledgment)See the examples/ directory for full examples:
# Basic example
cargo run --example basic_sync --features serde_json
# BufferedEmitter
cargo run --example buffered
# Kafka producer (requires local Kafka)
cargo run --example kafka_producer --features kafka
# Unit tests
cargo test
# Tests with all features
cargo test --all-features
# Clippy
cargo clippy --all-features -- -D warnings
# Format
cargo fmt -- --check
[!NOTE] Future versions may provide helper builders for common patterns, once enough real-world usage patterns are validated.
MIT OR Apache-2.0
Contributions are welcome! Please open an issue or PR.