| Crates.io | oxify-engine |
| lib.rs | oxify-engine |
| version | 0.1.0 |
| created_at | 2026-01-19 05:47:56.919167+00 |
| updated_at | 2026-01-19 05:47:56.919167+00 |
| description | Workflow execution engine for OxiFY - DAG orchestration, scheduling, and state management |
| homepage | |
| repository | https://github.com/cool-japan/oxify |
| max_upload_size | |
| id | 2053823 |
| size | 1,007,758 |
The Brain - DAG Execution Engine for OxiFY
oxify-engine is the workflow execution engine powering OxiFY's Event-Driven Architecture. It transforms DAG definitions into distributed task executions via CeleRS, with Rust's type system guaranteeing execution safety.
Status: ✅ Phase 1-10 Complete - Production Ready with Advanced Features Part of: OxiFY Enterprise Architecture (Codename: Absolute Zero)
The engine is a comprehensive workflow execution system featuring:
┌─────────────────────────────────────────┐
│ OxiFY Engine │
│ │
│ ┌───────────────────────────────────┐ │
│ │ 1. Validate Workflow │ │
│ └───────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────┐ │
│ │ 2. Topological Sort │ │
│ │ (Determine execution order) │ │
│ └───────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────┐ │
│ │ 3. Execute Nodes │ │
│ │ - Resolve dependencies │ │
│ │ - Pass variables │ │
│ │ - Update context │ │
│ └───────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────┐ │
│ │ 4. Return Execution Context │ │
│ └───────────────────────────────────┘ │
└─────────────────────────────────────────┘
use oxify_engine::Engine;
use oxify_model::Workflow;
#[tokio::main]
async fn main() -> Result<()> {
// Load or create workflow
let workflow = create_my_workflow();
// Create engine
let engine = Engine::new();
// Execute workflow
let mut context = engine.execute(&workflow).await?;
// Check results
match context.state {
ExecutionState::Completed => {
println!("Workflow completed successfully!");
// Access node results from context.node_results
}
ExecutionState::Failed(err) => {
println!("Workflow failed: {}", err);
}
_ => {}
}
Ok(())
}
The engine uses Kahn's algorithm for topological sorting:
let execution_order = engine.topological_sort(&workflow)?;
// Returns: [start_id, llm_id, retriever_id, end_id]
Each node type has specific execution logic:
oxify-connect-llmoxify-connect-vectoroxify-mcpVariables can reference previous node outputs:
// In prompt template:
"Summarize the following documents: {{retriever_node.output}}"
// Engine resolves {{retriever_node.output}} from context.node_results
The engine handles various error scenarios:
EngineError::CycleDetectedEngineError::NodeNotFoundEngineError::ValidationErrorExecutionState::FailedExport workflows to multiple formats for documentation and analysis:
use oxify_engine::{export_to_dot, export_to_mermaid, export_to_ascii};
// Export to GraphViz DOT format
let dot = export_to_dot(&workflow);
std::fs::write("workflow.dot", dot)?;
// Export to Mermaid diagram
let mermaid = export_to_mermaid(&workflow);
// Export to ASCII art for terminal
let ascii = export_to_ascii(&workflow);
println!("{}", ascii);
Analyze workflow execution performance and detect bottlenecks:
use oxify_engine::{WorkflowProfile, PerformanceAnalyzer};
// Create performance profile during execution
let mut profile = WorkflowProfile::new(
execution_id,
workflow.metadata.name.clone(),
total_duration
);
// Add node profiles...
// profile.add_node_profile(node_profile);
profile.calculate_critical_path();
// Analyze for bottlenecks
let analyzer = PerformanceAnalyzer::new();
let bottlenecks = analyzer.analyze(&profile);
for bottleneck in bottlenecks {
println!("{}: {}", bottleneck.bottleneck_type, bottleneck.description);
println!(" Recommendation: {}", bottleneck.recommendation);
}
Get recommendations for optimizing your workflows:
use oxify_engine::{WorkflowAnalyzer};
let analyzer = WorkflowAnalyzer::new();
// Calculate complexity metrics
let metrics = analyzer.calculate_complexity(&workflow);
println!("Complexity score: {}", metrics.complexity_score());
println!("DAG depth: {}, width: {}", metrics.dag_depth, metrics.dag_width);
// Get optimization recommendations
let recommendations = analyzer.analyze(&workflow);
for rec in recommendations {
println!("{:?}: {}", rec.optimization_type, rec.description);
println!(" Expected improvement: {:.1}%", rec.expected_improvement * 100.0);
}
Validate workflows before execution to catch potential issues early:
use oxify_engine::{HealthChecker, HealthSeverity};
let checker = HealthChecker::new();
let report = checker.check(&workflow);
println!("Health Score: {}/100", report.health_score);
println!("Is Healthy: {}", report.is_healthy);
// Check for critical issues
let critical = report.get_issues_by_severity(HealthSeverity::Critical);
for issue in critical {
println!("CRITICAL [{}]: {}", issue.category, issue.description);
println!(" Fix: {}", issue.recommendation);
}
// Check for errors
let errors = report.get_issues_by_severity(HealthSeverity::Error);
for issue in errors {
println!("ERROR [{}]: {}", issue.category, issue.description);
println!(" Fix: {}", issue.recommendation);
}
Health Checks Include:
The engine has comprehensive test coverage:
Test Categories:
The engine supports flexible configuration via ExecutionConfig:
use oxify_engine::{ExecutionConfig, CheckpointFrequency, ResourceConfig, BackpressureConfig};
let config = ExecutionConfig::default()
.with_checkpoint_frequency(CheckpointFrequency::EveryLevel)
.with_max_concurrent_nodes(10)
.with_timeout(std::time::Duration::from_secs(300))
.with_resource_monitoring(ResourceConfig::balanced())
.with_backpressure(BackpressureConfig::strict());
let result = engine.execute_with_config(&workflow, context, config).await?;
Large values (>= 1KB) are automatically stored with Arc for reference counting, avoiding expensive cloning.
Similar operations (LLM calls to the same provider, vector searches, etc.) are automatically batched for improved throughput.
Dynamically adjusts concurrency based on CPU and memory usage with multiple policies (Fixed, CpuBased, MemoryBased, Balanced).
Topological sort results are cached with 100-entry LRU cache for faster repeated executions.
Problem: Sequential execution is slow for independent nodes.
Solution: Enable parallel execution with appropriate concurrency limits:
let config = ExecutionConfig::default()
.with_max_concurrent_nodes(10); // Limit based on available resources
Best Practices:
CPU cores * 2 for CPU-bound tasksProblem: Fixed concurrency doesn't adapt to system load.
Solution: Use dynamic resource-aware scheduling:
use oxify_engine::{ResourceConfig, SchedulingPolicy};
// Balanced policy (adjusts based on both CPU and memory)
let config = ExecutionConfig::default()
.with_resource_monitoring(ResourceConfig::balanced());
// CPU-focused policy for compute-intensive workflows
let config = ExecutionConfig::default()
.with_resource_monitoring(ResourceConfig::cpu_based());
Performance Impact:
Problem: Queue saturation causes memory bloat and latency.
Solution: Configure backpressure strategy:
use oxify_engine::{BackpressureConfig, BackpressureStrategy};
// Block strategy: Wait when queue is full (safe, may slow down)
let config = ExecutionConfig::default()
.with_backpressure(BackpressureConfig::strict());
// Throttle strategy: Add delays when approaching limits (balanced)
let config = BackpressureConfig {
strategy: BackpressureStrategy::Throttle,
max_queued_nodes: 100,
max_active_nodes: 20,
high_water_mark: 0.8, // Start throttling at 80%
throttle_delay_ms: 100,
};
When to Use:
Block for critical workflows where no data loss is acceptableThrottle for high-throughput workflows with soft latency requirementsDrop only for best-effort, non-critical workflowsProblem: Large JSON objects are cloned repeatedly.
Solution: The engine automatically uses Arc-based storage for values >= 1KB:
use oxify_engine::VariableStore;
// The engine handles this automatically
let store = VariableStore::new();
store.insert("large_data".to_string(), large_json_value); // Auto-uses Arc
// Check storage efficiency
let stats = store.stats();
println!("Savings: {:.1}%", stats.savings_ratio() * 100.0);
Performance Impact:
Problem: Topological sort is recomputed on every execution.
Solution: Plan caching is automatic (100-entry LRU cache):
// First execution: computes and caches
engine.execute(&workflow).await?;
// Subsequent executions: uses cache (10-100x faster planning)
engine.execute(&workflow).await?; // Cache hit!
Performance Impact:
Problem: Sequential API calls have high latency overhead.
Solution: Use BatchAnalyzer for automatic operation grouping:
use oxify_engine::BatchAnalyzer;
let analyzer = BatchAnalyzer::new();
let plan = analyzer.analyze_workflow(&workflow);
println!("Batch groups: {}", plan.batches.len());
println!("Expected speedup: {:.1}x", plan.stats.speedup_factor);
Batching Opportunities:
Performance Impact:
Problem: Long-running workflows fail and restart from scratch.
Solution: Configure appropriate checkpoint frequency:
use oxify_engine::CheckpointFrequency;
// For short workflows (< 1 min): No checkpointing
let config = ExecutionConfig::default()
.with_checkpoint_frequency(CheckpointFrequency::Never);
// For medium workflows (1-10 min): Checkpoint every level
let config = ExecutionConfig::default()
.with_checkpoint_frequency(CheckpointFrequency::EveryLevel);
// For long workflows (> 10 min): Checkpoint every N nodes
let config = ExecutionConfig::default()
.with_checkpoint_frequency(CheckpointFrequency::EveryNNodes(10));
Trade-offs:
Problem: Repeated LLM calls with same inputs waste time and money.
Solution: LLM caching is automatic (1-hour TTL, 1000 entries):
// First call: executes API request
let result1 = engine.execute(&workflow).await?;
// Second call with same inputs: uses cache (1000x faster, $0 cost)
let result2 = engine.execute(&workflow).await?;
Performance Impact:
Problem: One failed node stops entire workflow.
Solution: Enable continue-on-error for non-critical nodes:
let config = ExecutionConfig::default()
.with_continue_on_error();
Use Cases:
Performance Impact:
Minimize DAG Depth:
Maximize Parallelism:
Optimize Node Granularity:
Reduce Node Count:
Example Optimization:
Before (slow):
depth=10, width=1, nodes=50 → 5 seconds
After (fast):
depth=3, width=8, nodes=25 → 1.2 seconds (4x faster)
Profile Workflow Execution:
use oxify_engine::{WorkflowProfile, PerformanceAnalyzer};
// Enable profiling during execution
let profile = engine.execute_with_profiling(&workflow).await?;
// Analyze bottlenecks
let analyzer = PerformanceAnalyzer::new();
let bottlenecks = analyzer.analyze(&profile);
for bottleneck in bottlenecks {
println!("Bottleneck: {}", bottleneck.description);
println!("Recommendation: {}", bottleneck.recommendation);
}
Get Optimization Recommendations:
use oxify_engine::WorkflowAnalyzer;
let analyzer = WorkflowAnalyzer::new();
let recommendations = analyzer.analyze(&workflow);
for rec in recommendations {
println!("{:?}: {}", rec.optimization_type, rec.description);
println!("Expected improvement: {:.1}%", rec.expected_improvement * 100.0);
}
Track Execution Costs:
use oxify_engine::CostEstimator;
let estimator = CostEstimator::new();
let estimate = estimator.estimate_workflow(&workflow, &context);
println!("Total cost: ${:.4}", estimate.total_cost());
println!("LLM costs: ${:.4}", estimate.llm_cost);
println!("Vector DB costs: ${:.4}", estimate.vector_cost);
Cost Reduction Strategies:
Level 0 (Sequential):
[Start] → Context initialized
Level 1 (Parallel - 3 nodes):
┌─────────────────────────────────────┐
│ [LLM-1] [LLM-2] [Retriever-1] │ ← Execute concurrently
│ ↓ ↓ ↓ │
│ result1 result2 documents │
└─────────────────────────────────────┘
All complete ← Wait for slowest
Level 2 (Sequential):
[Aggregate] → Combine results from Level 1
Level 3 (Parallel - 2 nodes):
┌─────────────────────────┐
│ [Format] [Validate] │ ← Execute concurrently
└─────────────────────────┘
Level 4 (Sequential):
[End] → Return final context
[Start]
↓
[Condition Node]
↓
Evaluate: user.age > 18?
╱ ╲
true false
↓ ↓
[Adult Branch] [Minor Branch]
↓ ↓
[Send Email] [Request Approval]
↓ ↓
[Log Action] [Log Action]
╲ ╱
╲ ╱
↓ ↓
[Merge]
↓
[End]
ForEach Loop:
[Start]
↓
[ForEach: items=[A, B, C]]
↓
┌──────────────────────────┐
│ Iteration 1: item=A │
│ Execute loop body │
│ Store result[0] │
└──────────────────────────┘
↓
┌──────────────────────────┐
│ Iteration 2: item=B │
│ Execute loop body │
│ Store result[1] │
└──────────────────────────┘
↓
┌──────────────────────────┐
│ Iteration 3: item=C │
│ Execute loop body │
│ Store result[2] │
└──────────────────────────┘
↓
Collect all results: [result[0], result[1], result[2]]
↓
[Next Node]
[Start]
↓
┌─────────────────────────────┐
│ Try Block │
│ [Risky Operation] │
│ ↓ │
│ Success? ──────────────┐ │
└──────────────────────────│─┘
│ │
Failure Success
↓ │
┌─────────────────────┐ │
│ Catch Block │ │
│ Handle error │ │
│ Bind {{error}} │ │
│ [Recovery] │ │
└─────────────────────┘ │
│ │
↓ ↓
┌──────────────────────────┐
│ Finally Block │
│ Always executes │
│ [Cleanup] │
└──────────────────────────┘
↓
[End]
High System Load (CPU: 85%, Memory: 90%):
Concurrency: 10 → 4 (scaled down)
┌─────────────────────────┐
│ [Node1] [Node2] │ ← Limited parallelism
│ [Node3] [Node4] │
└─────────────────────────┘
Low System Load (CPU: 20%, Memory: 30%):
Concurrency: 4 → 20 (scaled up)
┌─────────────────────────────────────────┐
│ [Node1] [Node2] ... [Node10] │ ← High parallelism
│ [Node11] [Node12] ... [Node20] │
└─────────────────────────────────────────┘
oxify-model: Workflow data structuresoxify-connect-llm: LLM provider integrationsoxify-connect-vector: Vector DB integrations