rs-dagcuter

Crates.iors-dagcuter
lib.rsrs-dagcuter
version0.1.0
created_at2025-07-16 10:15:42.736591+00
updated_at2025-07-23 09:06:29.586421+00
descriptionRustDagcuter 是一个用于执行任务的有向无环图 (DAG) 的 Rust 库。它管理任务依赖关系,检测循环依赖关系,并支持可自定义的任务生命周期(执行前、执行后)。它还支持并发执行独立任务,以提高性能。
homepage
repositoryhttps://github.com/busyster996/RustDagcuter
max_upload_size
id1755242
size57,026
Busyster996 (busyster996)

documentation

README

Dagcuter 🚀

License: MIT Rust

RustDagcuter is a Rust library for executing directed acyclic graphs (DAGs) of tasks. It manages task dependencies, detects cyclic dependencies, and supports customizable task lifecycles (pre-execution, post-execution). It also supports concurrent execution of independent tasks to improve performance.


✨ Core functions

  • Intelligent dependency management: Automatically parse and schedule multi-task dependencies.
  • Loop detection: Real-time discovery and prevention of loop dependencies.
  • High concurrent execution: Topological sorting drives parallel operation, making full use of multi-cores.
  • Exponential backoff retry: Built-in configurable retry strategy; supports custom intervals, multiples and maximum times.
  • Graceful cancellation: Supports mid-way cancellation and resource release.
  • Execution tracking: Real-time printing of task status and execution order.
  • Type safety: Static type guarantee, compile-time error checking.
  • Zero cost abstraction: Minimal runtime overhead.
  • Life cycle hook: Custom logic can be inserted before/after task execution.

🏗️ Project structure

dagcuter/
├─ src/
│ ├─ lib.rs # Core exports and type definitions
│ └─ executor.rs # DAG Executor Core Logic
├─ examples/ # Example code
| ├─ src/
| │ └─ main.rs
| └─ Cargo.toml
├─ Cargo.toml
└─ README.md

🚀 Quick start

  1. Add dependencies in Cargo.toml:
rs-dagcuter = { version = "0.1.0" }
tokio = { version = "1.0", features = ["full"] }
async-trait = "0.1"
tokio-util = "0.7"
serde_json = "1.0"
chrono = "0.4"
  1. Write the task and execute it:
use rs_dagcuter::*;
use async_trait::async_trait;
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use std::sync::Arc;

// 示例任务实现
struct ExampleTask {
    name: String,
    deps: Vec<String>,
}

#[async_trait]
impl Task for ExampleTask {
    fn name(&self) -> &str {
        &self.name
    }

    fn dependencies(&self) -> Vec<String> {
        self.deps.clone()
    }

    fn retry_policy(&self) -> Option<RetryPolicy> {
        Some(RetryPolicy {
            max_attempts: 3,
            ..Default::default()
        })
    }

    async fn execute(
        &self,
        _ctx: CancellationToken,
        _input: &TaskInput,
    ) -> Result<TaskResult, Error> {
        println!("执行任务: {}", self.name);

        // 模拟任务执行时间
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;

        let mut result = HashMap::new();
        result.insert("status".to_string(), serde_json::json!("completed"));
        result.insert("task_name".to_string(), serde_json::json!(self.name));
        result.insert("timestamp".to_string(), serde_json::json!(chrono::Utc::now().to_rfc3339()));
        Ok(result)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut tasks: HashMap<String, BoxTask> = HashMap::new();

    tasks.insert("task1".to_string(), Arc::new(ExampleTask {
        name: "task1".to_string(),
        deps: vec![],
    }));

    tasks.insert("task2".to_string(), Arc::new(ExampleTask {
        name: "task2".to_string(),
        deps: vec!["task1".to_string()],
    }));

    tasks.insert("task3".to_string(), Arc::new(ExampleTask {
        name: "task3".to_string(),
        deps: vec!["task1".to_string()],
    }));

    tasks.insert("task4".to_string(), Arc::new(ExampleTask {
        name: "task4".to_string(),
        deps: vec!["task2".to_string(), "task3".to_string()],
    }));

    tasks.insert("task5".to_string(), Arc::new(ExampleTask {
        name: "task5".to_string(),
        deps: vec!["task2".to_string()],
    }));

    tasks.insert("task6".to_string(), Arc::new(ExampleTask {
        name: "task6".to_string(),
        deps: vec!["task1".to_string(), "task4".to_string(), "task5".to_string()],
    }));



    let mut dag = Dag::new(tasks)?;
    let ctx = CancellationToken::new();

    println!("=== 任务依赖图 ===");
    dag.print_graph();

    println!("=== 开始执行任务 ===");
    let start = std::time::Instant::now();
    let results = dag.execute(ctx).await?;
    let duration = start.elapsed();

    println!("=== 执行完成 ===");
    println!("执行时间: {:?}", duration);
    println!("执行结果: {:#?}", results);
    println!("执行顺序: {}", dag.execution_order().await);

    Ok(())
}
  1. Run the example:
cd example
cargo run 

📚 API Overview

Task attribute

#[async_trait]
pub trait Task: Send + Sync {
    fn name(&self) -> &str;
    fn dependencies(&self) -> Vec<String>;
    fn retry_policy(&self) -> Option<RetryPolicy>;

    async fn pre_execution(
        &self,
        _ctx: CancellationToken,
        _input: &TaskInput,
    ) -> Result<(), Error> {
        Ok(())
    }

    async fn execute(
        &self,
        ctx: CancellationToken,
        input: &TaskInput,
    ) -> Result<TaskResult, Error>;

    async fn post_execution(
        &self,
        _ctx: CancellationToken,
        _output: &TaskResult,
    ) -> Result<(), Error> {
        Ok(())
    }
}

RetryPolicy

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryPolicy {
    pub interval: Duration,         // Initial retry interval
    pub max_interval: Duration,     // Maximum retry interval
    pub max_attempts: i32,          // Maximum number of retries
    pub multiplier: f64,            // Retry interval exponential
}

impl Default for RetryPolicy {
    fn default() -> Self {
        Self {
            interval: Duration::from_secs(1),
            max_interval: Duration::from_secs(30),
            max_attempts: 1,
            multiplier: 2.0,
        }
    }
}

Dag

impl Dag {
    /// Create a new DAG instance
    pub fn new(tasks: HashMap<String, BoxTask>) -> Result<Self, Error>;

    /// Execute all tasks in the DAG
    pub async fn execute(
        &mut self,
        ctx: CancellationToken,
    ) -> Result<HashMap<String, TaskResult>, Error>;

    /// Get the execution order of the DAG
    pub async fn execution_order(&self) -> String;

    /// Print the DAG graph
    pub fn print_graph(&self);
}

Error

#[derive(Error, Debug)]
pub enum Error {
    #[error("Circular dependency detected")]
    CircularDependency,
    
    #[error("Task execution failed: {0}")]
    TaskExecution(String),
    
    #[error("Context cancelled: {0}")]
    ContextCancelled(String),
    
    #[error("Retry failed: {0}")]
    RetryFailed(String),
}

🔧 Advanced usage

  • Custom retry: adjust interval, multiplier, max_attempts

  • Lifecycle hook: override pre_execution/post_execution

  • Cancellation and timeout: combine CancellationToken to control execution

  • Complex data flow: process TaskInput in execute and return a custom TaskResult

📝 License

This project adopts the MIT protocol, see LICENSE for details.

Commit count: 0

cargo fmt