| Crates.io | conveyor-etl-dsl |
| lib.rs | conveyor-etl-dsl |
| version | 0.1.0 |
| created_at | 2025-12-23 02:58:54.180697+00 |
| updated_at | 2025-12-23 02:58:54.180697+00 |
| description | YAML DSL parser for Conveyor ETL pipeline definitions |
| homepage | |
| repository | |
| max_upload_size | |
| id | 2000673 |
| size | 177,527 |
Pipeline DSL parsing, validation, and optimization.
This crate handles the YAML manifest format for defining pipeline resources. It parses manifests, validates references, converts to internal types, and optimizes pipeline DAGs.
conveyor-routing typesuse conveyor_dsl::{load_pipeline, parse_pipeline};
// From file
let pipeline = load_pipeline("pipeline.yaml")?;
// From string
let yaml = r#"
apiVersion: conveyor.etl/v1
kind: Pipeline
metadata:
name: my-pipeline
spec:
source: kafka-events
steps:
- filter
- enrich
sink: s3-archive
"#;
let pipeline = parse_pipeline(yaml)?;
Store and query resources:
use conveyor_dsl::Registry;
let mut registry = Registry::new();
// Apply manifests
registry.apply(source_manifest)?;
registry.apply(transform_manifest)?;
registry.apply(pipeline_manifest)?;
// Query
let source = registry.source_spec("default", "kafka-events");
let pipelines = registry.list_pipelines("default");
// Validate pipeline references
registry.validate_pipeline("default", "my-pipeline")?;
Merge shared pipeline prefixes:
use conveyor_dsl::{Optimizer, Registry};
let registry = Registry::new();
// ... apply manifests ...
let optimizer = Optimizer::new(®istry, "default");
let dag = optimizer.optimize()?;
// dag.stages contains shared stages
// dag.edges contains optimized routing
apiVersion: conveyor.etl/v1
kind: Source
metadata:
name: kafka-users
namespace: default
labels:
team: data
spec:
grpc:
endpoint: kafka-source-svc:50051
tls:
enabled: true
ca_cert: /certs/ca.pem
apiVersion: conveyor.etl/v1
kind: Transform
metadata:
name: filter-active
spec:
grpc:
endpoint: filter-svc:50051
config:
field: status
value: active
apiVersion: conveyor.etl/v1
kind: Sink
metadata:
name: s3-archive
spec:
grpc:
endpoint: s3-sink-svc:50051
config:
bucket: my-bucket
prefix: events/
apiVersion: conveyor.etl/v1
kind: Pipeline
metadata:
name: user-analytics
spec:
source: kafka-users
steps:
- filter-active
- enrich-geo
- mask-pii
sink: clickhouse-analytics
dlq:
enabled: true
maxRetries: 3
backoffMs: 1000
Before optimization:
Pipeline A: Kafka → Filter → Enrich → S3
Pipeline B: Kafka → Filter → Enrich → ClickHouse
After optimization:
┌→ S3
Kafka → Filter → Enrich (shared)
└→ ClickHouse
The validator checks:
use conveyor_dsl::validate;
validate(&manifest)?; // Returns DslError on failure
pub use types::*;
pub use error::{DslError, Result};
pub use parser::{parse_yaml, parse_file};
pub use validation::{validate, validate_backup, validate_restore};
pub use convert::convert;
pub use manifest::{Manifest, Metadata, ResourceKind, ...};
pub use registry::Registry;
pub use optimizer::{Optimizer, OptimizedDag, ...};