| Crates.io | kotoba-workflow |
| lib.rs | kotoba-workflow |
| version | 0.1.22 |
| created_at | 2025-09-17 16:09:30.162225+00 |
| updated_at | 2025-09-19 19:02:25.525053+00 |
| description | Serverless Workflow specification compliant workflow engine |
| homepage | |
| repository | https://github.com/com-junkawasaki/kotoba |
| max_upload_size | |
| id | 1843549 |
| size | 390,969 |
Temporal-inspired workflow engine built on top of Kotoba's graph rewriting system.
Itonami provides a powerful workflow execution engine that combines:
┌─────────────────────────────────────────────────────────────┐
│ Workflow Engine Layer │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Workflow Definition (.kotoba) │ │
│ │ - WorkflowIR: Temporalパターンの宣言的定義 │ │
│ │ - StrategyIR: 拡張(Parallel, Wait, Compensation) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Execution Engine │ │
│ │ - WorkflowExecutor: ワークフロー実行器 │ │
│ │ - ActivityExecutor: Activity実行器 │ │
│ │ - StateManager: MVCCベース状態管理 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Persistence Layer │ │
│ │ - WorkflowStore: ワークフロー永続化 │ │
│ │ - EventStore: イベント/メッセージ永続化 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼ (extends)
┌─────────────────────────────────────────────────────────────┐
│ Kotoba Core Engine │
│ - Graph Store (MVCC + Merkle) │
│ - Rule Engine (DPO) │
│ - Query Engine (GQL) │
│ - Distributed Execution │
└─────────────────────────────────────────────────────────────┘
Workflow executions now use Multi-Version Concurrency Control (MVCC) for:
// Get workflow state at specific transaction
let execution_at_tx = engine.get_execution_at_tx(&execution_id, tx_id).await;
// Get complete version history
let history = engine.get_execution_history(&execution_id).await;
Complete audit trail with event sourcing:
// Get full event history
let events = engine.get_event_history(&execution_id).await?;
// Rebuild execution from events (for recovery)
let execution = engine.rebuild_execution_from_events(&execution_id).await?;
Cluster-wide workflow distribution:
// Enable distributed execution
engine.enable_distributed_execution(
"node-1".to_string(),
Arc::new(LeastLoadedBalancer::new())
);
// Submit workflow for distributed execution
let task_id = engine.submit_distributed_workflow(execution_id).await?;
// Check cluster health
let health = engine.get_cluster_health().await?;
Performance optimization for long-running workflows:
Enterprise-grade Saga pattern support with comprehensive compensation logic:
// Create advanced Saga pattern
let saga_pattern = AdvancedSagaPattern {
name: "order_processing".to_string(),
main_flow: WorkflowStrategyOp::Seq { strategies: vec![...] },
compensations: HashMap::from([
("process_payment".to_string(), compensation_strategy),
("reserve_inventory".to_string(), compensation_strategy),
]),
config: SagaConfig {
timeout: Some(Duration::from_secs(300)),
compensation_policy: CompensationPolicy::ReverseOrder,
parallelism: 3,
..Default::default()
},
dependencies: HashMap::new(),
};
// Execute advanced Saga
let result = saga_engine.execute_advanced_saga(&saga_pattern, execution_id, inputs).await?;
Production-ready monitoring with metrics, tracing, and logging:
// Configure monitoring
let monitoring_config = MonitoringConfig {
enable_metrics: true,
enable_tracing: true,
enable_logging: true,
metrics_interval: Duration::from_secs(60),
exporters: vec![MonitoringExporter::Prometheus {
endpoint: "http://localhost:9090".to_string()
}],
};
// Track workflow execution
monitor.track_workflow_event(&execution_id, ExecutionEventType::WorkflowStarted, metadata).await?;
// Get performance stats
let stats = monitor.get_workflow_stats("order_processing")?;
println!("Avg execution time: {:?}", stats.avg_execution_time);
Advanced optimization engine with multiple strategies:
// Create optimization engine
let optimizer = WorkflowOptimizer::new(cost_model, resource_manager);
// Add optimization rules
optimizer.add_rule(Box::new(ParallelExecutionRule::new(4)));
optimizer.add_rule(Box::new(CostBasedOptimizationRule::new(100.0)));
// Optimize workflow
let result = optimizer.optimize_workflow(&workflow, &context).await?;
println!("Optimization saved: ${:.2}", result.improvements.iter()
.map(|imp| imp.cost_savings).sum::<f64>());
Seamless integration with external systems:
// Setup integrations
let mut integration_manager = IntegrationManager::new();
// Add HTTP integration
integration_manager.register_integration("api_client",
Box::new(HttpIntegration::new("https://api.example.com", Duration::from_secs(30))
.with_bearer_token("your-token")));
// Execute integration
let result = integration_manager.execute_integration("api_client", "users/123", HashMap::new()).await?;
println!("User data: {:?}", result);
| Feature | Description | Benefits |
|---|---|---|
| Saga Patterns | Distributed transaction management with compensation | Reliable complex workflows |
| Monitoring | Metrics, tracing, logging, health checks | Production observability |
| Optimization | Cost-based, performance, parallel execution | Efficient resource usage |
| Integrations | HTTP, DB, MQ, Cloud, Email, Webhooks | Seamless system connectivity |
| Health Checks | Component status monitoring | Proactive failure detection |
| Performance Analytics | Historical performance analysis | Continuous improvement |
use kotoba_workflow::{WorkflowEngine, WorkflowIR};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create workflow engine
let engine = WorkflowEngine::builder()
.with_storage("memory")
.build()
.await?;
// Load workflow definition
let workflow_ir = WorkflowIR::from_jsonnet("workflow.kotoba")?;
// Start workflow execution
let execution_id = engine.start_workflow(&workflow_ir, inputs).await?;
// Wait for completion
let result = engine.wait_for_completion(execution_id).await?;
println!("Workflow completed with result: {:?}", result);
Ok(())
}
{
workflow: {
id: "order_processing",
name: "Order Processing Workflow",
version: "1.0.0",
inputs: [
{ name: "orderId", type: "string", required: true },
{ name: "customerId", type: "string", required: true },
{ name: "amount", type: "number", required: true },
],
outputs: [
{ name: "processed", type: "boolean" },
{ name: "confirmationId", type: "string" },
],
strategy: {
op: "saga",
main_flow: {
op: "seq",
strategies: [
{
op: "activity",
activity_ref: "validate_order",
input_mapping: {
order_id: "$.inputs.orderId",
customer_id: "$.inputs.customerId",
},
},
{
op: "parallel",
branches: [
{
op: "activity",
activity_ref: "process_payment",
input_mapping: { amount: "$.inputs.amount" },
},
{
op: "activity",
activity_ref: "reserve_inventory",
input_mapping: { order_id: "$.inputs.orderId" },
},
],
},
{
op: "activity",
activity_ref: "send_confirmation",
},
],
},
compensation: {
op: "seq",
strategies: [
{ op: "activity", activity_ref: "cancel_payment" },
{ op: "activity", activity_ref: "release_inventory" },
{ op: "activity", activity_ref: "send_failure_notification" },
],
},
},
timeout: "PT30M",
},
}
use kotoba_workflow::activity::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let registry = ActivityRegistry::new();
// Register HTTP activity
let http_activity = ActivityBuilder::new("validate_order")
.http("https://api.example.com/validate", "POST")
.build();
registry.register(http_activity).await;
// Register custom function activity
let db_activity = DatabaseActivity::new("reserve_inventory",
"UPDATE inventory SET reserved = true WHERE item_id = $1");
registry.register(Arc::new(db_activity)).await;
// Register function activity
let send_email = FunctionActivity::new("send_confirmation", |inputs| {
let order_id = inputs.get("order_id").unwrap().as_str().unwrap();
// Send confirmation email logic
let mut outputs = HashMap::new();
outputs.insert("confirmation_id".to_string(), json!("CONF-123"));
Ok(outputs)
});
registry.register(Arc::new(send_email)).await;
Ok(())
}
| Aspect | Temporal | Itonami |
|---|---|---|
| Execution Model | Strict workflow control | Graph-based declarative execution |
| Persistence | Event sourcing + snapshots | MVCC + Merkle DAG |
| Language | Go | Rust (with .kotoba DSL) |
| Activity Types | SDK-based | Extensible trait system |
| Deployment | Dedicated server | Embedded in Kotoba |
| Query Language | SQL-like | GQL integration |
| State Management | Temporal server | Kotoba graph store |
Contributions are welcome! Please see the main Kotoba repository for contribution guidelines.
Licensed under MIT OR Apache-2.0