conveyor-etl-metrics

Crates.ioconveyor-etl-metrics
lib.rsconveyor-etl-metrics
version0.1.0
created_at2025-12-23 00:56:08.802991+00
updated_at2025-12-23 00:56:08.802991+00
descriptionMetrics collection and export for Conveyor ETL
homepage
repository
max_upload_size
id2000532
size42,165
Alex Choi (alexchoi0)

documentation

README

conveyor-metrics

Prometheus metrics for observability.

Overview

This crate provides metrics collection and export for monitoring Conveyor clusters. It uses the metrics crate for recording and exports to Prometheus format.

Metrics

Record Metrics

Metric Type Labels Description
conveyor_router_records_received_total Counter source_id Records received from sources
conveyor_router_records_routed_total Counter pipeline_id, stage_id Records routed through stages
conveyor_router_records_delivered_total Counter sink_id Records delivered to sinks

Latency Metrics

Metric Type Labels Description
conveyor_router_routing_latency_ms Histogram pipeline_id End-to-end routing latency

Buffer Metrics

Metric Type Labels Description
conveyor_router_buffer_utilization Gauge stage_id Buffer fill percentage (0-1)
conveyor_router_backpressure_events_total Counter source_id Backpressure signals sent

Service Metrics

Metric Type Labels Description
conveyor_router_active_services Gauge service_type Count of registered services

Raft Metrics

Metric Type Labels Description
conveyor_router_raft_is_leader Gauge - 1 if leader, 0 otherwise
conveyor_router_raft_term Gauge - Current Raft term

Operational Metrics

Metric Type Labels Description
conveyor_router_retry_events_total Counter stage_id Record retry attempts
conveyor_router_checkpoints_saved_total Counter service_id Checkpoints persisted
conveyor_router_group_rebalances_total Counter group_id Consumer group rebalances

Usage

Recording Metrics

use conveyor_metrics::{
    record_records_received,
    record_records_routed,
    record_routing_latency,
    record_buffer_utilization,
    record_raft_state,
};

// Record incoming records
record_records_received("kafka-source-1", 100);

// Record routing through pipeline
record_records_routed("user-analytics", "filter-stage", 95);

// Record latency
record_routing_latency("user-analytics", 15.5);

// Record buffer state
record_buffer_utilization("filter-stage", 0.45);

// Record Raft state
record_raft_state(true, 42);  // is_leader, term

Exporting Metrics

use conveyor_metrics::MetricsExporter;

let exporter = MetricsExporter::new(9090);
exporter.start().await?;

// Metrics available at http://localhost:9090/metrics

Prometheus Configuration

# prometheus.yml
scrape_configs:
  - job_name: 'conveyor-router'
    static_configs:
      - targets: ['conveyor-router-0:9090', 'conveyor-router-1:9090', 'conveyor-router-2:9090']
    scrape_interval: 15s

Grafana Dashboard

Example queries:

# Records per second by source
rate(conveyor_router_records_received_total[1m])

# Routing latency p99
histogram_quantile(0.99, rate(conveyor_router_routing_latency_ms_bucket[5m]))

# Buffer utilization
conveyor_router_buffer_utilization

# Leader election events
changes(conveyor_router_raft_is_leader[1h])

Exports

pub use prometheus::MetricsExporter;

pub fn record_records_received(source_id: &str, count: u64);
pub fn record_records_routed(pipeline_id: &str, stage_id: &str, count: u64);
pub fn record_records_delivered(sink_id: &str, count: u64);
pub fn record_routing_latency(pipeline_id: &str, latency_ms: f64);
pub fn record_buffer_utilization(stage_id: &str, utilization: f64);
pub fn record_active_services(service_type: &str, count: f64);
pub fn record_raft_state(is_leader: bool, term: u64);
pub fn record_backpressure_events(source_id: &str);
pub fn record_retry_events(stage_id: &str);
pub fn record_checkpoint_saved(service_id: &str);
pub fn record_group_rebalance(group_id: &str);
Commit count: 0

cargo fmt