| Crates.io | rs-dagcuter |
| lib.rs | rs-dagcuter |
| version | 0.1.0 |
| created_at | 2025-07-16 10:15:42.736591+00 |
| updated_at | 2025-07-23 09:06:29.586421+00 |
| description | RustDagcuter 是一个用于执行任务的有向无环图 (DAG) 的 Rust 库。它管理任务依赖关系,检测循环依赖关系,并支持可自定义的任务生命周期(执行前、执行后)。它还支持并发执行独立任务,以提高性能。 |
| homepage | |
| repository | https://github.com/busyster996/RustDagcuter |
| max_upload_size | |
| id | 1755242 |
| size | 57,026 |
RustDagcuter is a Rust library for executing directed acyclic graphs (DAGs) of tasks. It manages task dependencies, detects cyclic dependencies, and supports customizable task lifecycles (pre-execution, post-execution). It also supports concurrent execution of independent tasks to improve performance.
dagcuter/
├─ src/
│ ├─ lib.rs # Core exports and type definitions
│ └─ executor.rs # DAG Executor Core Logic
├─ examples/ # Example code
| ├─ src/
| │ └─ main.rs
| └─ Cargo.toml
├─ Cargo.toml
└─ README.md
Cargo.toml:rs-dagcuter = { version = "0.1.0" }
tokio = { version = "1.0", features = ["full"] }
async-trait = "0.1"
tokio-util = "0.7"
serde_json = "1.0"
chrono = "0.4"
use rs_dagcuter::*;
use async_trait::async_trait;
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use std::sync::Arc;
// 示例任务实现
struct ExampleTask {
name: String,
deps: Vec<String>,
}
#[async_trait]
impl Task for ExampleTask {
fn name(&self) -> &str {
&self.name
}
fn dependencies(&self) -> Vec<String> {
self.deps.clone()
}
fn retry_policy(&self) -> Option<RetryPolicy> {
Some(RetryPolicy {
max_attempts: 3,
..Default::default()
})
}
async fn execute(
&self,
_ctx: CancellationToken,
_input: &TaskInput,
) -> Result<TaskResult, Error> {
println!("执行任务: {}", self.name);
// 模拟任务执行时间
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut result = HashMap::new();
result.insert("status".to_string(), serde_json::json!("completed"));
result.insert("task_name".to_string(), serde_json::json!(self.name));
result.insert("timestamp".to_string(), serde_json::json!(chrono::Utc::now().to_rfc3339()));
Ok(result)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut tasks: HashMap<String, BoxTask> = HashMap::new();
tasks.insert("task1".to_string(), Arc::new(ExampleTask {
name: "task1".to_string(),
deps: vec![],
}));
tasks.insert("task2".to_string(), Arc::new(ExampleTask {
name: "task2".to_string(),
deps: vec!["task1".to_string()],
}));
tasks.insert("task3".to_string(), Arc::new(ExampleTask {
name: "task3".to_string(),
deps: vec!["task1".to_string()],
}));
tasks.insert("task4".to_string(), Arc::new(ExampleTask {
name: "task4".to_string(),
deps: vec!["task2".to_string(), "task3".to_string()],
}));
tasks.insert("task5".to_string(), Arc::new(ExampleTask {
name: "task5".to_string(),
deps: vec!["task2".to_string()],
}));
tasks.insert("task6".to_string(), Arc::new(ExampleTask {
name: "task6".to_string(),
deps: vec!["task1".to_string(), "task4".to_string(), "task5".to_string()],
}));
let mut dag = Dag::new(tasks)?;
let ctx = CancellationToken::new();
println!("=== 任务依赖图 ===");
dag.print_graph();
println!("=== 开始执行任务 ===");
let start = std::time::Instant::now();
let results = dag.execute(ctx).await?;
let duration = start.elapsed();
println!("=== 执行完成 ===");
println!("执行时间: {:?}", duration);
println!("执行结果: {:#?}", results);
println!("执行顺序: {}", dag.execution_order().await);
Ok(())
}
cd example
cargo run
Task attribute#[async_trait]
pub trait Task: Send + Sync {
fn name(&self) -> &str;
fn dependencies(&self) -> Vec<String>;
fn retry_policy(&self) -> Option<RetryPolicy>;
async fn pre_execution(
&self,
_ctx: CancellationToken,
_input: &TaskInput,
) -> Result<(), Error> {
Ok(())
}
async fn execute(
&self,
ctx: CancellationToken,
input: &TaskInput,
) -> Result<TaskResult, Error>;
async fn post_execution(
&self,
_ctx: CancellationToken,
_output: &TaskResult,
) -> Result<(), Error> {
Ok(())
}
}
RetryPolicy#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryPolicy {
pub interval: Duration, // Initial retry interval
pub max_interval: Duration, // Maximum retry interval
pub max_attempts: i32, // Maximum number of retries
pub multiplier: f64, // Retry interval exponential
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
interval: Duration::from_secs(1),
max_interval: Duration::from_secs(30),
max_attempts: 1,
multiplier: 2.0,
}
}
}
Dagimpl Dag {
/// Create a new DAG instance
pub fn new(tasks: HashMap<String, BoxTask>) -> Result<Self, Error>;
/// Execute all tasks in the DAG
pub async fn execute(
&mut self,
ctx: CancellationToken,
) -> Result<HashMap<String, TaskResult>, Error>;
/// Get the execution order of the DAG
pub async fn execution_order(&self) -> String;
/// Print the DAG graph
pub fn print_graph(&self);
}
Error#[derive(Error, Debug)]
pub enum Error {
#[error("Circular dependency detected")]
CircularDependency,
#[error("Task execution failed: {0}")]
TaskExecution(String),
#[error("Context cancelled: {0}")]
ContextCancelled(String),
#[error("Retry failed: {0}")]
RetryFailed(String),
}
Custom retry: adjust interval, multiplier, max_attempts
Lifecycle hook: override pre_execution/post_execution
Cancellation and timeout: combine CancellationToken to control execution
Complex data flow: process TaskInput in execute and return a custom TaskResult
This project adopts the MIT protocol, see LICENSE for details.