danube-connect-core

Crates.iodanube-connect-core
lib.rsdanube-connect-core
version0.4.1
created_at2025-12-12 15:00:51.587294+00
updated_at2026-01-11 11:10:53.847876+00
descriptionCore SDK for building Danube connectors
homepage
repositoryhttps://github.com/danube-messaging/danube-connect
max_upload_size
id1981684
size261,839
Dan Rusei (danrusei)

documentation

https://docs.rs/danube-connect-core

README

danube-connect-core

Crates.io Docs.rs License

Core SDK for building high-performance connectors for the Danube messaging system.

Overview

danube-connect-core is a production-ready framework for building connectors that integrate Danube with external systems. Whether you're building a sink connector to export messages or a source connector to import data, this SDK provides everything you need.

Key Features

  • 🔌 Simple Trait-Based API - Implement just SinkConnector or SourceConnector traits
  • 📋 Schema Registry Integration - Automatic schema-aware serialization/deserialization
  • 🔄 Multi-Topic Support - Handle multiple topics with different schemas per connector
  • 🎯 Zero Schema Boilerplate - Runtime handles all schema operations transparently
  • ⚙️ Automatic Runtime Management - Lifecycle, message loops, and graceful shutdown
  • 🔁 Built-in Retry Logic - Exponential backoff with jitter for resilient integrations
  • 📊 Observability - Prometheus metrics, structured logging, and health checks
  • 🛠️ Message Utilities - Batching, type conversion, and validation helpers
  • High Performance - Async/await with Tokio, connection pooling, schema caching

Core Concepts

Connector Traits

SinkConnector (Danube → External System)

  • Consume messages from Danube topics
  • Receive typed serde_json::Value data (already deserialized by runtime)
  • Write to external databases, APIs, or services

SourceConnector (External System → Danube)

  • Read from external systems
  • Provide typed serde_json::Value data
  • Runtime handles schema-aware serialization before publishing

Schema Registry Integration

The runtime automatically handles schema operations - your connector works with typed data:

For Sink Connectors:

  • Runtime deserializes messages based on their schema
  • You receive SinkRecord with typed serde_json::Value payload
  • Access data directly or deserialize to structs with as_type<T>()
  • Schema info available via record.schema()

For Source Connectors:

  • Create SourceRecord with typed data using helper methods
  • Runtime serializes based on topic's configured schema
  • Automatic schema registration and validation
  • Support for JSON Schema, String, Bytes, Number, and Avro (Protobuf coming soon)

Benefits:

  • ✅ Zero schema boilerplate in connector code
  • ✅ Type safety and data validation
  • ✅ Managed schema evolution with version strategies
  • ✅ Automatic caching for performance
  • ✅ Centralized schema management

Runtime Management

Both SinkRuntime and SourceRuntime handle:

  • Schema registry client initialization and caching
  • Schema-aware serialization/deserialization
  • Message polling and processing loops
  • Automatic retry with exponential backoff
  • Graceful shutdown and signal handling
  • Prometheus metrics and health checks

Configuration

Connectors can be configured in two ways:

1. TOML Configuration (recommended for standalone connectors):

danube_service_url = "http://localhost:6650"
connector_name = "my-connector"

[[schemas]]
topic = "/events/users"
subject = "user-events-schema"
schema_type = "json_schema"
schema_file = "schemas/user-event.json"
version_strategy = "latest"  # or "pinned" or "minimum"

# Include connector-specific settings
[mqtt]
broker = "mqtt://localhost:1883"

2. Programmatic Configuration (for embedding in applications):

let config = ConnectorConfig {
    danube_service_url: "http://localhost:6650".to_string(),
    connector_name: "my-connector".to_string(),
    retry: RetrySettings::default(),
    processing: ProcessingSettings::default(),
    schemas: vec![/* schema mappings */],
};

See Programmatic Configuration Guide for complete details.

The runtime automatically:

  • Loads schema definitions from files
  • Registers schemas with Danube's schema registry
  • Caches schemas for performance
  • Handles schema versioning and evolution

Message Types

SinkRecord - Messages from Danube

  • payload() - Returns &serde_json::Value (typed data, already deserialized)
  • as_type<T>() - Deserialize to specific Rust type
  • schema() - Schema metadata (subject, version, type)
  • topic(), offset(), attributes(), publish_time() - Message metadata

SourceRecord - Messages to Danube

  • new(topic, payload) - Create from serde_json::Value
  • from_json(topic, data) - Create from any serializable type
  • from_string(topic, text) - Create from string
  • from_number(topic, number) - Create from numeric value
  • from_avro(topic, data) - Create from Avro-compatible struct
  • from_bytes(topic, data) - Create from binary data
  • with_attribute(), with_key() - Add metadata

Utilities

  • Batcher<T> - Message batching with size/timeout-based flushing
  • HealthChecker - Health tracking with failure thresholds
  • ConnectorMetrics - Prometheus metrics integration

Installation

Add to your Cargo.toml:

[dependencies]
danube-connect-core = "0.3"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"

Documentation

Getting Started

API Reference

Examples & Source Code

  • 🔌 Production Connectors - Reference implementations:

    • MQTT Source Connector (IoT integration)
    • HTTP/Webhook Source Connector (API ingestion)
    • Qdrant Sink Connector (Vector embeddings)
    • SurrealDB Sink Connector (Multi-model database)
    • Delta Lake Sink Connector (Data lake)
  • 💻 SDK Source Code - Core framework implementation

Performance & Best Practices

danube-connect-core is optimized for production workloads:

  • Async/await - Built on Tokio for efficient async I/O
  • Schema caching - Automatic in-memory schema cache with LRU eviction
  • Batching - Process messages in configurable batches for higher throughput
  • Connection pooling - Reuse connections to external systems and Danube
  • Metrics - Built-in Prometheus metrics for monitoring and alerting
  • Health checks - HTTP endpoint for Kubernetes liveness/readiness probes

See the Development Guide for schema evolution strategies and production deployment patterns.

Contributing

Contributions are welcome! Here's how you can help:

  • Core SDK Improvements: Enhance the connector framework, add new features, or improve performance
  • Documentation: Improve guides, add examples, or clarify concepts
  • Bug Fixes: Report and fix issues
  • Testing: Add test coverage and integration tests

Please open an issue or pull request on GitHub.

License

Licensed under the Apache License, Version 2.0. See LICENSE for details.

Ecosystem

Resources

Commit count: 0

cargo fmt