| Crates.io | upflow |
| lib.rs | upflow |
| version | 0.1.0 |
| created_at | 2026-01-25 08:32:18.496509+00 |
| updated_at | 2026-01-25 08:32:18.496509+00 |
| description | An asynchronous workflow engine based on DAG |
| homepage | |
| repository | https://github.com/uporm/upflow |
| max_upload_size | |
| id | 2068379 |
| size | 133,670 |
Upflow 是一个基于 Rust 构建的强大、异步工作流引擎。它利用有向无环图(DAG)结构来编排复杂的任务依赖关系,支持并行执行、条件分支、子流程和自定义节点扩展。
Upflow 基于 tokio 构建,专为高性能和可扩展性而设计,非常适合构建编排平台、业务流程自动化和数据处理管道。
tokio 构建的全异步运行时,实现高效的资源利用。NodeExecutor trait 轻松实现自定义逻辑。{{node_id.output_field}})。将 upflow 添加到你的 Cargo.toml:
[dependencies]
upflow = { version = "0.1.0" } # 请检查 crates.io 获取最新版本
tokio = { version = "1", features = ["full"] }
serde_json = "1.0"
async-trait = "0.1"
下面是一个简单的示例,展示如何定义工作流并使用 Upflow 运行它。
创建一个描述 DAG 的 workflow.json 文件:
{
"nodes": [
{
"id": "node-start",
"type": "start",
"data": {
"input": [
{ "name": "message", "type": "STRING" }
]
}
},
{
"id": "node-process",
"type": "my-custom-node",
"data": {
"prefix": "Processed: "
}
},
{
"id": "node-output",
"type": "output",
"data": {
"final_result": "{{node-process.result}}"
}
}
],
"edges": [
{ "source": "node-start", "target": "node-process" },
{ "source": "node-process", "target": "node-output" }
]
}
use async_trait::async_trait;
use serde_json::{json, Value};
use std::sync::Arc;
use upflow::prelude::*;
// 定义自定义节点
struct MyCustomNode;
#[async_trait]
impl NodeExecutor for MyCustomNode {
async fn execute(&self, ctx: NodeContext) -> Result<Value, WorkflowError> {
// 解析输入数据
let input_data = ctx.flow_context.resolve_value(&ctx.node.data)?;
let prefix = input_data["prefix"].as_str().unwrap_or("");
// 获取前一个节点(node-start)的数据
// 在实际场景中,你会访问流程上下文或从开始节点传递的输入
// 在这个例子中,假设我们要处理输入 payload
let payload = ctx.flow_context.payload();
let message = payload["message"].as_str().unwrap_or("default");
let result = format!("{}{}", prefix, message);
println!("Executing MyCustomNode: {}", result);
// 返回输出
Ok(json!({ "result": result }))
}
}
// 简单的输出节点
struct OutputNode;
#[async_trait]
impl NodeExecutor for OutputNode {
async fn execute(&self, ctx: NodeContext) -> Result<Value, WorkflowError> {
let resolved = ctx.flow_context.resolve_value(&ctx.node.data)?;
println!("Workflow Final Output: {:?}", resolved);
Ok(resolved)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. 初始化引擎
let engine = WorkflowEngine::global();
// 2. 注册节点
engine.register("my-custom-node", MyCustomNode);
engine.register("output", OutputNode);
// 3. 加载工作流
let workflow_json = include_str!("workflow.json"); // 假设 workflow.json 在 src/ 目录下或与 main 同级
engine.load("my-workflow", workflow_json)?;
// 4. 运行工作流
let payload = json!({ "message": "Hello World" });
let ctx = Arc::new(FlowContext::new().with_payload(payload));
let result = engine.run_with_ctx("my-workflow", ctx).await?;
println!("Workflow Status: {:?}", result.status);
Ok(())
}
欢迎贡献!如果你有改进的想法或发现了 bug,请开启 issue 或提交 pull request。
git checkout -b feature/amazing-feature)。git commit -m 'Add some amazing feature')。git push origin feature/amazing-feature)。本项目采用 Apache-2.0 许可证。