actflow

Crates.ioactflow
lib.rsactflow
version0.1.6
created_at2025-12-02 09:58:04.905757+00
updated_at2025-12-17 07:16:24.761608+00
descriptionA lightweight, event-driven workflow engine written in Rust.
homepage
repositoryhttps://github.com/yunis-du/actflow
max_upload_size
id1961463
size282,311
Yunis (yunis-du)

documentation

README

Actflow

Actflow is a lightweight, event-driven workflow engine written in Rust. It is designed to be embedded in your applications to orchestrate complex business logic with ease.

Features

  • Event-Driven Architecture: Built on top of a robust event bus, ensuring high decoupling and scalability.
  • Async Execution: Powered by tokio, supporting high-concurrency workflow execution.
  • Flexible Workflow Definition: Define workflows using JSON, supporting various node types and control flows.

Supported Actions

Action Description
start Entry point of the workflow
end End point of the workflow
http_request HTTP request with support for GET/POST/PUT/DELETE, authentication (Bearer/Basic/Custom), headers, params, and body
if_else Conditional branching based on variable comparisons (equals, not_equals, contains, greater_than, etc.)
code Execute JavaScript or Python code with variable inputs and JSON outputs
agent Call remote agent service via gRPC with streaming support for logs and outputs

Template Variables

Actflow supports template variables to reference outputs from other nodes:

{{#nodeId.key#}}
{{#nodeId.key.subkey#}}

Example: {{#n1.body.data.user.name#}} references the name field from node n1's output.

You can also reference environment variables from the Context:

{{$VAR_NAME$}}

Example: {{$API_KEY$}} references the API_KEY environment variable.

Quick Start

Here is a simple example of how to define and run a workflow:

use std::collections::HashMap;

use actflow::{ChannelEvent, ChannelOptions, EdgeModel, EngineBuilder, NodeModel, WorkflowModel};
use serde_json::json;

fn main() {
    // 1. Build engine
    let engine = EngineBuilder::new().build().unwrap();

    // 2. Launch engine
    engine.launch();

    // 3. Define workflow model
    let workflow = WorkflowModel {
        id: "hello_world".to_string(),
        name: "hello_world".to_string(),
        desc: "A simple hello world workflow".to_string(),
        env: HashMap::new(),
        nodes: vec![
            NodeModel {
                id: "n1".to_string(),
                title: "Step 1".to_string(),
                desc: "Start node".to_string(),
                uses: "start".to_string(),
                action: json!({}),
                ..Default::default()
            },
            NodeModel {
                id: "n2".to_string(),
                title: "Step 2".to_string(),
                desc: "HTTP Request node".to_string(),
                uses: "http_request".to_string(),
                action: json!({
                    "url": "https://httpbin.org/get",
                    "method": "GET",
                    "auth": {
                        "auth_type": "no_auth"
                    },
                    "headers": {},
                    "params": {},
                    "body": {
                        "content_type": "none"
                    },
                    "timeout": 30000
                }),
                ..Default::default()
            },
        ],
        edges: vec![EdgeModel {
            id: "e1".to_string(),
            source: "n1".to_string(),
            target: "n2".to_string(),
            source_handle: "source".to_string(),
        }],
    };

    // 4. Create workflow process
    let process = engine.build_workflow_process(&workflow).unwrap();

    // 5. Listen to events
    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_complete(move |pid| {
        println!("Workflow completed, pid: {}", pid);
    });

    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_error(move |e| {
        println!("Workflow failed: {:?}", e);
    });

    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_log(move |e| {
        println!("Workflow log: {:?}", e);
    });

    ChannelEvent::channel(engine.channel(), ChannelOptions::with_pid(process.id().to_string())).on_event(move |e| {
        println!("Event: {:?}", e);
    });

    // 6. Run workflow process
    process.start();
    println!("Started process: {}", process.id());

    loop {
        if process.is_complete() {
            break;
        }
        std::thread::sleep(std::time::Duration::from_millis(100));
    }

    let outputs: serde_json::Value = process.get_outputs().into();
    println!();
    println!("----------------------------------------");
    println!("Outputs: {:#?}", outputs);
}

Architecture

Actflow consists of several core components:

  • Engine: The entry point of the system, responsible for managing resources, loading workflows, and spawning processes.
  • Channel: An internal event bus that broadcasts events (Workflow, Node, Log) to subscribers.
  • Process: An instance of a running workflow. It maintains the execution context and state.
  • Dispatcher: Responsible for scheduling node execution based on the workflow graph and current state.
Commit count: 0

cargo fmt