| Crates.io | danube-connect-core |
| lib.rs | danube-connect-core |
| version | 0.4.1 |
| created_at | 2025-12-12 15:00:51.587294+00 |
| updated_at | 2026-01-11 11:10:53.847876+00 |
| description | Core SDK for building Danube connectors |
| homepage | |
| repository | https://github.com/danube-messaging/danube-connect |
| max_upload_size | |
| id | 1981684 |
| size | 261,839 |
Core SDK for building high-performance connectors for the Danube messaging system.
danube-connect-core is a production-ready framework for building connectors that integrate Danube with external systems. Whether you're building a sink connector to export messages or a source connector to import data, this SDK provides everything you need.
SinkConnector or SourceConnector traitsSinkConnector (Danube → External System)
serde_json::Value data (already deserialized by runtime)SourceConnector (External System → Danube)
serde_json::Value dataThe runtime automatically handles schema operations - your connector works with typed data:
For Sink Connectors:
SinkRecord with typed serde_json::Value payloadas_type<T>()record.schema()For Source Connectors:
SourceRecord with typed data using helper methodsBenefits:
Both SinkRuntime and SourceRuntime handle:
Connectors can be configured in two ways:
1. TOML Configuration (recommended for standalone connectors):
danube_service_url = "http://localhost:6650"
connector_name = "my-connector"
[[schemas]]
topic = "/events/users"
subject = "user-events-schema"
schema_type = "json_schema"
schema_file = "schemas/user-event.json"
version_strategy = "latest" # or "pinned" or "minimum"
# Include connector-specific settings
[mqtt]
broker = "mqtt://localhost:1883"
2. Programmatic Configuration (for embedding in applications):
let config = ConnectorConfig {
danube_service_url: "http://localhost:6650".to_string(),
connector_name: "my-connector".to_string(),
retry: RetrySettings::default(),
processing: ProcessingSettings::default(),
schemas: vec![/* schema mappings */],
};
See Programmatic Configuration Guide for complete details.
The runtime automatically:
SinkRecord - Messages from Danube
payload() - Returns &serde_json::Value (typed data, already deserialized)as_type<T>() - Deserialize to specific Rust typeschema() - Schema metadata (subject, version, type)topic(), offset(), attributes(), publish_time() - Message metadataSourceRecord - Messages to Danube
new(topic, payload) - Create from serde_json::Valuefrom_json(topic, data) - Create from any serializable typefrom_string(topic, text) - Create from stringfrom_number(topic, number) - Create from numeric valuefrom_avro(topic, data) - Create from Avro-compatible structfrom_bytes(topic, data) - Create from binary datawith_attribute(), with_key() - Add metadataBatcher<T> - Message batching with size/timeout-based flushingHealthChecker - Health tracking with failure thresholdsConnectorMetrics - Prometheus metrics integrationAdd to your Cargo.toml:
[dependencies]
danube-connect-core = "0.3"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
🔌 Production Connectors - Reference implementations:
💻 SDK Source Code - Core framework implementation
danube-connect-core is optimized for production workloads:
See the Development Guide for schema evolution strategies and production deployment patterns.
Contributions are welcome! Here's how you can help:
Please open an issue or pull request on GitHub.
Licensed under the Apache License, Version 2.0. See LICENSE for details.