Crates.io | ds-event-stream-rs-sdk |
lib.rs | ds-event-stream-rs-sdk |
version | 0.1.0 |
created_at | 2025-09-24 15:01:00.382948+00 |
updated_at | 2025-09-24 15:01:00.382948+00 |
description | A comprehensive Rust SDK for working with the DS Event Stream, providing high-level abstractions for producing and consuming events from Kafka |
homepage | |
repository | https://github.com/grasp-labs/ds-event-stream-rs-sdk |
max_upload_size | |
id | 1853251 |
size | 128,241 |
A Rust SDK for interacting with the DS Event Stream via Kafka. This library provides a clean, async interface for producing and consuming events from the DS Event Stream.
Add this to your Cargo.toml
:
[dependencies]
ds-event-stream-rs-sdk = "0.1.0"
Or use cargo add:
cargo add ds-event-stream-rs-sdk
use ds_event_stream_rs_sdk::producer::KafkaProducer;
use ds_event_stream_rs_sdk::model::v1::EventStream;
use ds_event_stream_rs_sdk::model::topics::Topic;
use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
use tracing::info;
use uuid::Uuid;
use chrono::Utc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
let credentials = ClientCredentials {
username: "username".to_string(),
password: "password".to_string()
};
let producer = KafkaProducer::default(&bootstrap_servers, &credentials)?;
let event = EventStream {
id: Uuid::new_v4(),
session_id: Uuid::new_v4(),
tenant_id: Uuid::new_v4(),
event_source: "pipeline-service".to_string(),
event_type: "pipeline-created".to_string(),
timestamp: Utc::now(),
created_by: "user-42".to_string(),
md5_hash: "hash".to_string(),
request_id: None,
owner_id: None,
product_id: None,
product_schema_uri: None,
event_source_uri: None,
affected_entity_uri: None,
message: None,
payload: Some(serde_json::json!({"pipeline_id": "pipeline-123"})),
payload_uri: None,
context: None,
context_uri: None,
metadata: None,
tags: None,
};
producer.send_event(&Topic::DsPipelineJobRequested, "user-42", &event, None).await?;
info!("Event sent to Kafka");
Ok(())
}
use ds_event_stream_rs_sdk::consumer::KafkaConsumer;
use ds_event_stream_rs_sdk::model::topics::Topic;
use ds_event_stream_rs_sdk::utils::{get_bootstrap_servers, Environment, ClientCredentials};
use tokio_stream::StreamExt;
use rdkafka::message::Message;
use tracing::{error, info};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
let credentials = ClientCredentials {
username: "username".to_string(),
password: "password".to_string()
};
// Initialize consumer
let consumer = KafkaConsumer::default(
&bootstrap_servers,
&[Topic::DsPipelineJobRequested],
"group-id",
&credentials
)?;
let mut stream = consumer.stream();
// Process events
while let Some(result) = stream.next().await {
match result {
Ok(msg) => {
info!("Received message on topic: {}", msg.topic());
if let Some(payload) = msg.payload() {
info!("Payload: {:?}", std::str::from_utf8(payload)?);
}
}
Err(e) => error!("Kafka error: {}", e),
}
}
Ok(())
}
The SDK provides utility functions for getting bootstrap servers for different environments:
get_bootstrap_servers(Environment::Development, false)
- Development external serversget_bootstrap_servers(Environment::Development, true)
- Development internal serversget_bootstrap_servers(Environment::Production, false)
- Production external serversget_bootstrap_servers(Environment::Production, true)
- Production internal serversAuthentication is handled via ClientCredentials
struct with username and password fields.
This project is licensed under either of
at your option.