taskflow-rs

Crates.iotaskflow-rs
lib.rstaskflow-rs
version0.1.1
created_at2025-09-21 13:32:20.501692+00
updated_at2025-09-21 14:17:02.311026+00
descriptionA high-performance, async-first task orchestration framework for Rust
homepage
repositoryhttps://github.com/lispking/taskflow-rs
max_upload_size
id1848817
size157,030
King (lispking)

documentation

README

taskflow-rs

A modern, async task processing framework written in Rust for building scalable workflow systems with support for multiple task types, dependency management, and distributed execution.

Features

  • Multiple Task Types: HTTP requests, shell commands, and custom task handlers
  • Dependency Management: Tasks can depend on the completion of other tasks
  • Async Execution: Built on Tokio for high-performance async execution
  • Retry Mechanisms: Configurable retry policies with exponential backoff
  • Real-time Monitoring: Live task metrics and status tracking
  • Extensible Storage: Pluggable storage backends (in-memory, Redis, etc.)
  • Worker System: Distributed execution with multiple worker nodes
  • Comprehensive Logging: Structured logging with execution traces

Architecture

Core Components

  • TaskFlow: Main framework orchestrator
  • Scheduler: Manages task scheduling and dependency resolution
  • Executor: Handles task execution with worker coordination
  • Storage: Abstract storage layer for task persistence
  • Task Handlers: Extensible system for different task types

Storage Backends

  • In-Memory: For development and testing
  • Redis: For distributed deployments (planned)
  • PostgreSQL: For persistent storage (planned)

Installation

Add to your Cargo.toml:

cargo add taskflow-rs

Examples

Check the examples/ directory for complete working examples:

  • basic_usage.rs: Basic task submission and execution
  • yaml_config_usage.rs: Using YAML configuration for tasks
  • simple_execution.rs: Basic task submission and execution
  • custom_handler.rs: Custom task handler example

API Reference

TaskFlow

impl TaskFlow {
    /// Create a new TaskFlow with configuration
    pub async fn new(config: TaskFlowConfig) -> Result<Self, TaskFlowError>;

    /// Create from YAML configuration file
    pub async fn from_yaml_file<P: AsRef<Path>>(path: P) -> Result<Self, TaskFlowError>;

    /// Create from YAML configuration string
    pub async fn from_yaml_str(config_content: &str) -> Result<Self, TaskFlowError>;

    /// Register a custom task handler
    pub async fn register_handler(&self, handler: Arc<dyn TaskHandler>);

    /// Submit a task for execution
    pub async fn submit_task(&self, definition: TaskDefinition) -> Result<String, TaskFlowError>;

    /// Submit an HTTP task (convenience method)
    pub async fn submit_http_task(
        &self,
        name: &str,
        url: &str,
        method: Option<&str>
    ) -> Result<String, TaskFlowError>;

    /// Submit a shell command task (convenience method)
    pub async fn submit_shell_task(
        &self,
        name: &str,
        command: &str,
        args: Vec<&str>
    ) -> Result<String, TaskFlowError>;

    /// Get task status
    pub async fn get_task_status(&self, task_id: &str) -> Result<Option<TaskStatus>, TaskFlowError>;

    /// Cancel a task
    pub async fn cancel_task(&self, task_id: &str) -> Result<(), TaskFlowError>;

    /// List tasks with optional status filter
    pub async fn list_tasks(&self, status: Option<TaskStatus>) -> Result<Vec<Task>, TaskFlowError>;

    /// Wait for task completion with timeout
    pub async fn wait_for_completion(
        &self,
        task_id: &str,
        timeout_seconds: Option<u64>
    ) -> Result<Task, TaskFlowError>;

    /// Get task execution metrics
    pub async fn get_task_metrics(&self) -> Result<TaskMetrics, TaskFlowError>;

    /// Start the framework (scheduler and executor)
    pub async fn start(&self) -> Result<(), TaskFlowError>;

    /// Shutdown the framework
    pub async fn shutdown(&self) -> Result<(), TaskFlowError>;
}

TaskDefinition

impl TaskDefinition {
    /// Create a new task definition
    pub fn new(name: &str, task_type: &str) -> Self;

    /// Add a payload parameter (serde_json::Value)
    pub fn with_payload(mut self, key: &str, value: serde_json::Value) -> Self;

    /// Set task priority
    pub fn with_priority(mut self, priority: i32) -> Self;

    /// Set task timeout in seconds
    pub fn with_timeout(mut self, timeout_seconds: u64) -> Self;

    /// Set task dependencies
    pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self;

    /// Set task tags
    pub fn with_tags(mut self, tags: Vec<String>) -> Self;

    /// Schedule task for future execution
    pub fn schedule_at(mut self, scheduled_at: DateTime<Utc>) -> Self;
}

Configuration

use taskflow_rs::framework::TaskFlowConfig;

let config = TaskFlowConfig {
    max_workers: 4,                    // Maximum concurrent workers
    task_timeout_ms: 30000,            // Default task timeout (30 seconds)
    retry_delay_ms: 1000,              // Delay between retries
    max_retries: 3,                    // Maximum retry attempts
    storage_type: StorageType::Memory,  // Storage backend type
};

let taskflow = TaskFlow::new(config).await?;

Error Handling

The framework uses TaskFlowError for all error cases, with detailed error variants:

pub enum TaskFlowError {
    StorageError(String),
    TaskNotFound(String),
    TaskValidationError(String),
    DependencyCycle(String),
    ExecutionError(String),
    TimeoutError(String),
    ConfigurationError(String),
}

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Submit a pull request

License

This project is licensed under the Apache 2.0 License.

Commit count: 4

cargo fmt