zeal-sdk

Crates.iozeal-sdk
lib.rszeal-sdk
version1.0.5
created_at2025-09-03 10:11:02.177793+00
updated_at2025-09-03 10:11:02.177793+00
descriptionRust SDK for Zeal Integration Protocol (ZIP)
homepagehttps://github.com/offbit-ai/zeal
repositoryhttps://github.com/offbit-ai/zeal.git
max_upload_size
id1822292
size283,222
'Damilare Darmie Akinlaja (darmie)

documentation

https://docs.rs/zeal-sdk

README

Zeal Rust SDK

High-performance Rust SDK for the Zeal Integration Protocol (ZIP), enabling efficient third-party workflow runtime integration with the Zeal workflow editor.

Prerequisites

⚠️ Important: A running Zeal server instance is required for the SDK to function. The SDK communicates with the Zeal server via REST APIs and WebSocket connections.

Starting the Zeal Server

# Clone the Zeal repository
git clone https://github.com/offbit-ai/zeal.git
cd zeal

# Install dependencies
npm install

# Start the development server
npm run dev
# Or use the start script
./start-dev.sh

The Zeal server will be available at http://localhost:3000 by default.

For detailed setup instructions, deployment options, and configuration, please refer to the Zeal repository.

Features

  • Zero-copy JSON parsing with serde_json and simd-json
  • Async/await support with tokio and futures
  • WebSocket real-time communication with tokio-tungstenite
  • HTTP/2 client with reqwest and connection pooling
  • Structured logging with tracing and OpenTelemetry support
  • Memory-efficient streaming for large payloads
  • Built-in retry logic with exponential backoff
  • Compile-time safety with strong typing and error handling
  • Observable streams with futures-util and custom stream combinators
  • Thread-safe concurrent operations with Arc and Mutex

Installation

Add to your Cargo.toml:

[dependencies]
zeal-sdk = "1.0.0"

# For async runtime
tokio = { version = "1.0", features = ["full"] }

# For observables/streams (optional)
futures = "0.3"

Quick Start

use zeal_sdk::{ZealClient, ClientConfig, NodeTemplate};
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize client
    let client = ZealClient::new(ClientConfig {
        base_url: "http://localhost:3000".to_string(),
        ..Default::default()
    })?;

    // Register node templates
    let templates = vec![
        NodeTemplate {
            id: "data-processor".to_string(),
            type_name: "processor".to_string(),
            title: "Data Processor".to_string(),
            category: "Processing".to_string(),
            description: "Processes data efficiently".to_string(),
            // ... other fields
        }
    ];

    client.templates().register(
        "my-runtime",
        templates,
        None
    ).await?;

    // Create webhook subscription
    let subscription = client.create_subscription(SubscriptionOptions {
        port: Some(3001),
        namespace: Some("my-runtime".to_string()),
        events: vec!["workflow.*".to_string(), "node.*".to_string()],
        ..Default::default()
    })?;

    // Handle events with callback
    subscription.on_event(|event| async move {
        println!("Received event: {} - {}", event.event_type, event.data);
    }).await;

    // Start receiving events
    subscription.start().await?;

    Ok(())
}

Core APIs

Templates API

Register and manage node templates:

use zeal_sdk::templates::*;

// Register templates
let result = client.templates().register(
    "my-integration",
    vec![template],
    Some("http://my-server.com/webhook".to_string())
).await?;

// List templates
let templates = client.templates().list("my-integration").await?;

// Get specific template
let template = client.templates().get("template-id").await?;

Orchestrator API

Programmatically create and modify workflows:

use zeal_sdk::orchestrator::*;

// Create workflow
let workflow = client.orchestrator().create_workflow(CreateWorkflowRequest {
    name: "My Workflow".to_string(),
    description: Some("Created via Rust SDK".to_string()),
    metadata: None,
}).await?;

// Add node
let node = client.orchestrator().add_node(AddNodeRequest {
    workflow_id: workflow.workflow_id,
    template_id: "template-id".to_string(),
    position: Position { x: 100.0, y: 100.0 },
    property_values: Some(serde_json::json!({
        "param1": "value1"
    })),
}).await?;

// Connect nodes
client.orchestrator().connect_nodes(ConnectNodesRequest {
    workflow_id: workflow.workflow_id,
    source: NodePort {
        node_id: "node1".to_string(),
        port_id: "output".to_string(),
    },
    target: NodePort {
        node_id: "node2".to_string(),
        port_id: "input".to_string(),
    },
}).await?;

Traces API

Submit execution trace data with high performance:

use zeal_sdk::traces::*;

// Create trace session
let session = client.traces().create_session(CreateTraceSessionRequest {
    workflow_id: "workflow-id".to_string(),
    execution_id: "exec-123".to_string(),
    metadata: Some(TraceMetadata {
        trigger: Some("manual".to_string()),
        environment: Some("production".to_string()),
        tags: vec!["batch-job".to_string()],
    }),
}).await?;

// Submit events (batched for efficiency)
let events = vec![
    TraceEvent {
        timestamp: chrono::Utc::now().timestamp_millis(),
        node_id: "node-id".to_string(),
        event_type: TraceEventType::Output,
        data: TraceData {
            size: 1024,
            data_type: "application/json".to_string(),
            preview: Some(serde_json::json!({"processed": 1000})),
            full_data: None,
        },
        duration: Some(std::time::Duration::from_millis(150)),
        ..Default::default()
    }
];

client.traces().submit_events(&session.session_id, events).await?;

// Complete session
client.traces().complete_session(
    &session.session_id,
    TraceStatus::Completed
).await?;

Events API

Real-time bidirectional communication:

use zeal_sdk::events::*;
use futures_util::StreamExt;

// Connect to WebSocket
let mut event_stream = client.events().connect("workflow-id").await?;

// Handle incoming events
tokio::spawn(async move {
    while let Some(event) = event_stream.next().await {
        match event {
            Ok(ZealEvent::NodeExecuting { node_id, .. }) => {
                println!("Node {} is executing", node_id);
            }
            Ok(ZealEvent::NodeCompleted { node_id, result, .. }) => {
                println!("Node {} completed: {:?}", node_id, result);
            }
            Err(e) => eprintln!("WebSocket error: {}", e),
        }
    }
});

// Send events
client.events().send_runtime_event(RuntimeEvent {
    event_type: RuntimeEventType::NodeExecutionStart,
    workflow_id: "workflow-id".to_string(),
    data: serde_json::json!({
        "nodeId": "node-123",
        "timestamp": chrono::Utc::now().timestamp_millis()
    }),
}).await?;

Observable Streams

Process events with powerful stream combinators:

use zeal_sdk::observables::*;
use futures_util::{StreamExt, TryStreamExt};

// Create subscription and get observable stream
let subscription = client.create_subscription(SubscriptionOptions::default())?;
let stream = subscription.as_observable().await?;

// Filter and transform events
let error_stream = stream
    .filter_map(|event| async move {
        if event.event_type.contains("error") {
            Some(ErrorEvent {
                id: event.id,
                error: event.data.get("error").cloned()?,
                timestamp: event.timestamp,
            })
        } else {
            None
        }
    })
    .take(100) // Limit to first 100 errors
    .collect::<Vec<_>>()
    .await;

// Subscribe to specific event types
let node_events = stream
    .filter(|event| async move {
        matches!(event.event_type.as_str(), "node.executed" | "node.failed")
    })
    .for_each(|event| async {
        println!("Node event: {:#?}", event);
    })
    .await;

// Advanced stream processing
use futures_util::stream;

let processed_stream = stream
    .buffer_unordered(10) // Process up to 10 events concurrently
    .filter_map(|result| async move {
        match result {
            Ok(event) => Some(process_event(event).await),
            Err(e) => {
                eprintln!("Stream error: {}", e);
                None
            }
        }
    })
    .take_while(|processed| {
        let should_continue = processed.is_ok();
        async move { should_continue }
    });

Advanced Features

Connection Pooling and Performance

use zeal_sdk::{ClientConfig, PerformanceConfig};

let client = ZealClient::new(ClientConfig {
    base_url: "http://localhost:3000".to_string(),
    performance: PerformanceConfig {
        max_connections_per_host: 50,
        connection_timeout: std::time::Duration::from_secs(10),
        request_timeout: std::time::Duration::from_secs(30),
        tcp_keepalive: Some(std::time::Duration::from_secs(60)),
        http2_prior_knowledge: true,
        ..Default::default()
    },
    ..Default::default()
})?;

Batch Operations

// Batch trace events for optimal performance
use zeal_sdk::traces::TraceBatch;

let mut batch = TraceBatch::new(1000); // 1000 events per batch

for i in 0..10000 {
    batch.add_event(TraceEvent {
        // ... event data
    })?;
    
    // Auto-submit when batch is full
    if let Some(events) = batch.try_flush() {
        client.traces().submit_events(&session_id, events).await?;
    }
}

// Submit remaining events
if let Some(events) = batch.flush() {
    client.traces().submit_events(&session_id, events).await?;
}

Structured Logging and Observability

use tracing::{info, error, instrument};
use zeal_sdk::telemetry::ZealTelemetry;

// Initialize telemetry
ZealTelemetry::init()?;

#[instrument(skip(client))]
async fn process_workflow(
    client: &ZealClient,
    workflow_id: &str
) -> Result<(), Box<dyn std::error::Error>> {
    info!("Starting workflow processing: {}", workflow_id);
    
    let session = client.traces().create_session(CreateTraceSessionRequest {
        workflow_id: workflow_id.to_string(),
        execution_id: uuid::Uuid::new_v4().to_string(),
        metadata: None,
    }).await?;
    
    info!("Created trace session: {}", session.session_id);
    
    // Processing logic here...
    
    Ok(())
}

Custom Error Types

use zeal_sdk::errors::*;

match client.templates().get("invalid-id").await {
    Ok(template) => println!("Template: {:#?}", template),
    Err(ZealError::NotFound { resource, id }) => {
        eprintln!("Template '{}' not found", id);
    }
    Err(ZealError::NetworkError { source, retryable }) => {
        eprintln!("Network error: {} (retryable: {})", source, retryable);
        if retryable {
            // Implement retry logic
        }
    }
    Err(ZealError::ValidationError { field, message }) => {
        eprintln!("Validation error in '{}': {}", field, message);
    }
    Err(e) => eprintln!("Other error: {}", e),
}

Performance Benchmarks

The Rust SDK is designed for high-performance applications:

  • Memory usage: ~2MB baseline, efficient streaming for large payloads
  • CPU efficiency: Zero-copy JSON parsing, async I/O
  • Throughput: 50,000+ events/second on modern hardware
  • Latency: Sub-millisecond event processing overhead
  • Concurrent connections: 1000+ WebSocket connections per instance

Examples

See the examples directory for complete working examples:

Platform Support

  • Linux (x86_64, aarch64)
  • macOS (x86_64, Apple Silicon)
  • Windows (x86_64)

License

Apache-2.0

Commit count: 0

cargo fmt