| Crates.io | flowbuilder |
| lib.rs | flowbuilder |
| version | 0.1.0 |
| created_at | 2025-05-24 23:40:54.720414+00 |
| updated_at | 2025-07-20 00:40:47.296562+00 |
| description | An async flow orchestration framework with conditional execution and context sharing. |
| homepage | https://github.com/ThneS/flowbuilder |
| repository | https://github.com/ThneS/flowbuilder.git |
| max_upload_size | |
| id | 1687862 |
| size | 47,588 |
🚀 企业级异步工作流引擎 - 基于 Rust 的高性能工作流引擎,支持 YAML 配置驱动、分层架构设计
在 Cargo.toml 中添加依赖:
[dependencies]
flowbuilder = { version = "0.0.2", features = ["yaml", "runtime"] }
tokio = { version = "1.0", features = ["full"] }
workflow:
version: "1.0"
env:
ENVIRONMENT: "production"
LOG_LEVEL: "info"
vars:
max_retries: 3
timeout: 30
tasks:
- task:
id: "setup"
name: "环境设置"
description: "初始化执行环境"
actions:
- action:
id: "init"
name: "初始化"
type: "builtin"
flow:
retry:
max_retries: 2
delay: 1000
timeout:
duration: 5000
- task:
id: "process"
name: "数据处理"
description: "处理业务数据"
actions:
- action:
id: "process_data"
name: "数据处理"
type: "builtin"
use flowbuilder_yaml::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 从 YAML 配置创建执行器
let yaml_content = std::fs::read_to_string("workflow.yaml")?;
let mut executor = DynamicFlowExecutor::from_yaml(&yaml_content)?;
// 创建执行上下文
let context = std::sync::Arc::new(tokio::sync::Mutex::new(
flowbuilder_context::FlowContext::default()
));
// 执行工作流
let result = executor.execute(context).await?;
println!("工作流执行完成: {}", result.success);
println!("总耗时: {:?}", result.total_duration);
println!("执行节点数: {}", result.nodes_executed);
Ok(())
}
FlowBuilder 采用分层架构设计,确保高性能、可扩展性和易维护性:
┌─────────────────────┐
│ YAML 配置文件 │
└─────────────────────┘
↓
┌─────────────────────┐
│ YamlConfigParser │ ← 配置解析器
│ • 解析 YAML 配置 │
│ • 验证配置完整性 │
│ • 生成执行节点 │
└─────────────────────┘
↓
┌─────────────────────┐
│ EnhancedOrchestrator│ ← 流程编排器
│ • 创建执行计划 │
│ • 优化执行顺序 │
│ • 分析工作流复杂度 │
└─────────────────────┘
↓
┌─────────────────────┐
│ EnhancedExecutor │ ← 任务执行器
│ • 执行具体任务 │
│ • 并行执行控制 │
│ • 重试和超时处理 │
└─────────────────────┘
### 4. **Distributed Tracing | 分布式追踪**
```rust
// With custom trace ID
FlowBuilder::new()
.named_step("service_a", |_ctx| async move {
println!("Processing in service A");
Ok(())
})
.named_step("service_b", |_ctx| async move {
println!("Processing in service B");
Ok(())
})
.run_all_with_trace_id("user-request-12345".to_string())
.await?;
// Output includes trace ID in all logs:
// [trace_id:user-request-12345] [step:service_a] starting...
// [trace_id:user-request-12345] [step:service_a] completed successfully in 1.2ms
FlowBuilder::new()
// Continue on error (don't stop the flow)
.step_continue_on_error("optional_step", |_ctx| async move {
anyhow::bail!("This error won't stop the flow")
})
// Handle errors with custom logic
.step_handle_error("critical_step",
|_ctx| async move {
anyhow::bail!("Critical error")
},
|ctx, error| {
ctx.set_variable("error_handled".to_string(), "true".to_string());
println!("Handled error: {}", error);
Ok(())
}
)
// Wait until condition is met
.step_wait_until("wait_for_recovery",
|ctx| ctx.get_variable("error_handled").is_some(),
Duration::from_millis(100),
10
)
.run_all()
.await?;
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ FlowBuilder │───▶│ SharedContext │───▶│ StepResults │
│ │ │ │ │ │
│ • Step Chain │ │ • Variables │ │ • Trace Logs │
│ • Parallel Exec │ │ • Snapshots │ │ • Performance │
│ • Error Handle │ │ • Error State │ │ • Error Details │
└─────────────────┘ └──────────────────┘ └─────────────────┘
// Step-level timeout
.step_with_timeout("api_call", Duration::from_secs(30), handler)
// Flow-level timeout
.run_all_with_timeout(Duration::from_minutes(5))
.step_with_retry("flaky_operation", 3, Duration::from_secs(1), handler)
.parallel_steps_with_join("batch_process", subflows) // Wait for all
.parallel_steps(subflows) // Fire and forget
Run all tests:
cargo test
Run specific test suites:
查看 examples/new_architecture_demo.rs 获取完整的使用示例。
运行所有测试:
cargo test
运行示例:
cargo run --example new_architecture_demo
欢迎提交 Issue 和 Pull Request!请查看 CONTRIBUTING.md 了解贡献指南。
本项目采用 Apache License 2.0 许可证 - 查看 LICENSE 文件了解详情。
用 ❤️ 为 Rust 社区打造