graph-flow

Crates.iograph-flow
lib.rsgraph-flow
version0.4.0
created_at2025-06-29 06:03:26.825203+00
updated_at2025-09-11 12:19:05.987324+00
descriptionA high-performance, type-safe framework for building multi-agent workflow systems in Rust
homepagehttps://github.com/a-agmon/rs-graph-llm
repositoryhttps://github.com/a-agmon/rs-graph-llm
max_upload_size
id1730410
size207,760
Alon Agmon (a-agmon)

documentation

https://docs.rs/graph-flow

README

graph-flow

A high-performance, type-safe framework for building multi-agent workflow systems in Rust.

Features

  • Type-Safe Workflows: Compile-time guarantees for workflow correctness
  • Flexible Execution: Step-by-step, batch, or mixed execution modes
  • Built-in Persistence: PostgreSQL and in-memory storage backends
  • LLM Integration: Optional integration with Rig for AI agent capabilities
  • Human-in-the-Loop: Natural workflow interruption and resumption
  • Async/Await Native: Built from the ground up for async Rust
  • Parallel Blocks (FanOutTask): Run multiple tasks concurrently inside a single node

Quick Start

Add to your Cargo.toml:

[dependencies]
graph-flow = "0.2"

# For LLM integration
graph-flow = { version = "0.2", features = ["rig"] }

Basic Example

use graph_flow::{Context, Task, TaskResult, NextAction, GraphBuilder, FlowRunner, InMemorySessionStorage, Session};
use async_trait::async_trait;
use std::sync::Arc;

// Define a simple greeting task
struct HelloTask;

#[async_trait]
impl Task for HelloTask {
    fn id(&self) -> &str {
        "hello_task"
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let name: String = context.get("name").await.unwrap_or("World".to_string());
        let greeting = format!("Hello, {}! How are you today?", name);
        
        // Store the greeting in context for other tasks
        context.set("greeting", greeting.clone()).await;
        
        Ok(TaskResult::new(Some(greeting), NextAction::Continue))
    }
}

#[tokio::main]
async fn main() -> graph_flow::Result<()> {
    // Build the graph
    let hello_task = Arc::new(HelloTask);
    let graph = Arc::new(
        GraphBuilder::new("greeting_workflow")
            .add_task(hello_task.clone())
            .build()
    );

    // Set up session storage and runner
    let session_storage = Arc::new(InMemorySessionStorage::new());
    let flow_runner = FlowRunner::new(graph.clone(), session_storage.clone());
    
    // Create a session with initial data
    let session = Session::new_from_task("user_123".to_string(), hello_task.id());
    session.context.set("name", "Alice".to_string()).await;
    session_storage.save(session).await?;
    
    // Execute the workflow
    let result = flow_runner.run("user_123").await?;
    println!("Response: {:?}", result.response);
    
    Ok(())
}

Core API Reference

Tasks - The Building Blocks

Tasks implement the core Task trait and define the units of work in your workflow:

Basic Task Implementation

use graph_flow::{Task, TaskResult, NextAction, Context};
use async_trait::async_trait;

struct DataProcessingTask {
    name: String,
}

#[async_trait]
impl Task for DataProcessingTask {
    fn id(&self) -> &str {
        &self.name
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        // Get input data from context
        let input: String = context.get("user_input").await.unwrap_or_default();
        
        // Process the data
        let processed = format!("Processed: {}", input.to_uppercase());
        
        // Store result for next task
        context.set("processed_data", processed.clone()).await;
        
        // Return result with next action
        Ok(TaskResult::new(
            Some(format!("Data processed: {}", processed)),
            NextAction::Continue
        ))
    }
}

Task with Status Messages

struct ValidationTask;

#[async_trait]
impl Task for ValidationTask {
    fn id(&self) -> &str {
        "validator"
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let data: Option<String> = context.get("processed_data").await;
        
        match data {
            Some(data) if data.len() > 10 => {
                Ok(TaskResult::new_with_status(
                    Some("Data validation passed".to_string()),
                    NextAction::Continue,
                    Some("Data meets minimum length requirement".to_string())
                ))
            }
            Some(_) => {
                Ok(TaskResult::new_with_status(
                    Some("Data validation failed - too short".to_string()),
                    NextAction::WaitForInput,
                    Some("Waiting for user to provide more data".to_string())
                ))
            }
            None => {
                Ok(TaskResult::new_with_status(
                    Some("No data found to validate".to_string()),
                    NextAction::GoTo("data_input".to_string()),
                    Some("Redirecting to data input task".to_string())
                ))
            }
        }
    }
}

NextAction - Controlling Flow

The NextAction enum controls how your workflow progresses:

// Continue to next task, but pause execution (step-by-step mode)
Ok(TaskResult::new(Some("Step completed".to_string()), NextAction::Continue))

// Continue and execute the next task immediately (continuous mode)
Ok(TaskResult::new(Some("Moving forward".to_string()), NextAction::ContinueAndExecute))

// Wait for user input before continuing
Ok(TaskResult::new(Some("Need more info".to_string()), NextAction::WaitForInput))

// Jump to a specific task
Ok(TaskResult::new(Some("Redirecting".to_string()), NextAction::GoTo("specific_task".to_string())))

// Go back to the previous task
Ok(TaskResult::new(Some("Going back".to_string()), NextAction::GoBack))

// End the workflow
Ok(TaskResult::new(Some("All done!".to_string()), NextAction::End))

// Convenience methods
TaskResult::move_to_next()        // NextAction::Continue
TaskResult::move_to_next_direct() // NextAction::ContinueAndExecute

Context - State Management

The Context provides thread-safe state sharing across tasks:

Basic Context Operations

// Setting values
context.set("key", "value").await;
context.set("number", 42).await;
context.set("complex_data", MyStruct { field: "value" }).await;

// Getting values
let value: Option<String> = context.get("key").await;
let number: Option<i32> = context.get("number").await;
let complex: Option<MyStruct> = context.get("complex_data").await;

// Synchronous operations (useful in edge conditions)
context.set_sync("sync_key", "sync_value");
let sync_value: Option<String> = context.get_sync("sync_key");

// Removing values
let removed: Option<serde_json::Value> = context.remove("key").await;

// Clearing all data (preserves chat history)
context.clear().await;

Chat History Management

// Adding messages
context.add_user_message("Hello, assistant!".to_string()).await;
context.add_assistant_message("Hello! How can I help you?".to_string()).await;
context.add_system_message("System: Session started".to_string()).await;

// Getting chat history
let history = context.get_chat_history().await;
let all_messages = context.get_all_messages().await;
let last_5 = context.get_last_messages(5).await;

// Chat history info
let count = context.chat_history_len().await;
let is_empty = context.is_chat_history_empty().await;

// Clear chat history
context.clear_chat_history().await;

// Context with message limits
let context = Context::with_max_chat_messages(100);

LLM Integration (with rig feature)

#[cfg(feature = "rig")]
{
    // Get messages in rig format for LLM calls
    let rig_messages = context.get_rig_messages().await;
    let last_10_for_llm = context.get_last_rig_messages(10).await;
    
    // Use with rig's completion API
    // let response = agent.completion(&rig_messages).await?;
}

Graph Building

Create complex workflows using the GraphBuilder:

Linear Workflow

let graph = GraphBuilder::new("linear_workflow")
    .add_task(task1.clone())
    .add_task(task2.clone())
    .add_task(task3.clone())
    .add_edge(task1.id(), task2.id())  // task1 -> task2
    .add_edge(task2.id(), task3.id())  // task2 -> task3
    .build();

Conditional Workflow

let graph = GraphBuilder::new("conditional_workflow")
    .add_task(input_task.clone())
    .add_task(process_a.clone())
    .add_task(process_b.clone())
    .add_task(final_task.clone())
    .add_conditional_edge(
        input_task.id(),
        |ctx| ctx.get_sync::<String>("user_type").unwrap_or_default() == "premium",
        process_a.id(),    // if premium user
        process_b.id(),    // if regular user
    )
    .add_edge(process_a.id(), final_task.id())
    .add_edge(process_b.id(), final_task.id())
    .build();

Complex Branching

let graph = GraphBuilder::new("complex_workflow")
    .add_task(start_task.clone())
    .add_task(validation_task.clone())
    .add_task(processing_task.clone())
    .add_task(error_handler.clone())
    .add_task(success_task.clone())
    .add_task(retry_task.clone())
    // Initial flow
    .add_edge(start_task.id(), validation_task.id())
    // Validation branches
    .add_conditional_edge(
        validation_task.id(),
        |ctx| ctx.get_sync::<bool>("is_valid").unwrap_or(false),
        processing_task.id(),  // valid -> process
        error_handler.id(),    // invalid -> error
    )
    // Processing branches
    .add_conditional_edge(
        processing_task.id(),
        |ctx| ctx.get_sync::<bool>("success").unwrap_or(false),
        success_task.id(),     // success -> done
        retry_task.id(),       // failure -> retry
    )
    // Retry logic
    .add_conditional_edge(
        retry_task.id(),
        |ctx| ctx.get_sync::<i32>("retry_count").unwrap_or(0) < 3,
        validation_task.id(),  // retry -> validate again
        error_handler.id(),    // max retries -> error
    )
    .set_start_task(start_task.id())
    .build();

Execution Patterns

Step-by-Step Execution

Best for interactive applications where you want control between each step:

let flow_runner = FlowRunner::new(graph, session_storage);

loop {
    let result = flow_runner.run(&session_id).await?;
    
    match result.status {
        ExecutionStatus::Completed => {
            println!("Workflow completed: {:?}", result.response);
            break;
        }
        ExecutionStatus::WaitingForInput => {
            println!("Waiting for input: {:?}", result.response);
            // Get user input and update context
            // context.set("user_input", user_input).await;
            continue;
        }
        ExecutionStatus::Paused { next_task_id } => {
            println!("Paused at {}: {:?}", next_task_id, result.response);
            // Optionally do something before next step
            continue;
        }
        ExecutionStatus::Error(e) => {
            eprintln!("Error: {}", e);
            break;
        }
    }
}

Continuous Execution

For tasks that should run automatically until completion:

// Tasks use NextAction::ContinueAndExecute
struct AutoTask;

#[async_trait]
impl Task for AutoTask {
    fn id(&self) -> &str { "auto_task" }
    
    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        // Do work...
        Ok(TaskResult::new(
            Some("Work done".to_string()),
            NextAction::ContinueAndExecute  // Continue automatically
        ))
    }
}

// Single call executes until completion or interruption
let result = flow_runner.run(&session_id).await?;

Mixed Execution

Combine both patterns in the same workflow:

struct InteractiveTask;

#[async_trait]
impl Task for InteractiveTask {
    fn id(&self) -> &str { "interactive" }
    
    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let needs_input: bool = context.get("needs_user_input").await.unwrap_or(false);
        
        if needs_input {
            Ok(TaskResult::new(
                Some("Please provide input".to_string()),
                NextAction::WaitForInput  // Stop and wait
            ))
        } else {
            Ok(TaskResult::new(
                Some("Processing automatically".to_string()),
                NextAction::ContinueAndExecute  // Continue automatically
            ))
        }
    }
}

Storage Backends

In-Memory Storage (Development)

use graph_flow::InMemorySessionStorage;

let storage = Arc::new(InMemorySessionStorage::new());

// Create and save a session
let session = Session::new_from_task("session_1".to_string(), "start_task");
session.context.set("initial_data", "value").await;
storage.save(session).await?;

// Retrieve and use
let session = storage.get("session_1").await?.unwrap();
let data: String = session.context.get("initial_data").await.unwrap();

PostgreSQL Storage (Production)

use graph_flow::PostgresSessionStorage;

// Connect to database
let storage = Arc::new(
    PostgresSessionStorage::connect(&database_url).await?
);

// Works the same as in-memory
let session = Session::new_from_task("session_1".to_string(), "start_task");
storage.save(session).await?;

Advanced Examples

Multi-Agent Conversation System

use graph_flow::*;

struct AgentTask {
    agent_name: String,
    system_prompt: String,
}

#[async_trait]
impl Task for AgentTask {
    fn id(&self) -> &str {
        &self.agent_name
    }

    async fn run(&self, context: Context) -> Result<TaskResult> {
        // Get conversation history
        let messages = context.get_all_messages().await;
        
        // Add system context if first message from this agent
        if messages.is_empty() {
            context.add_system_message(self.system_prompt.clone()).await;
        }
        
        // Get latest user message
        let user_input: Option<String> = context.get("user_input").await;
        
        if let Some(input) = user_input {
            context.add_user_message(input).await;
            
            // Here you would integrate with your LLM
            let response = format!("[{}] Processed: {}", self.agent_name, "response");
            context.add_assistant_message(response.clone()).await;
            
            // Store for next agent or user
            context.set("last_agent_response", response.clone()).await;
            
            Ok(TaskResult::new(Some(response), NextAction::Continue))
        } else {
            Ok(TaskResult::new(
                Some("Waiting for user input".to_string()),
                NextAction::WaitForInput
            ))
        }
    }
}

// Build multi-agent workflow
let analyst = Arc::new(AgentTask {
    agent_name: "analyst".to_string(),
    system_prompt: "You are a data analyst.".to_string(),
});

let reviewer = Arc::new(AgentTask {
    agent_name: "reviewer".to_string(),
    system_prompt: "You review and critique analysis.".to_string(),
});

let graph = GraphBuilder::new("multi_agent_chat")
    .add_task(analyst.clone())
    .add_task(reviewer.clone())
    .add_conditional_edge(
        analyst.id(),
        |ctx| ctx.get_sync::<bool>("needs_review").unwrap_or(true),
        reviewer.id(),
        analyst.id(), // Loop back for more analysis
    )
    .build();

Error Handling and Recovery

struct ResilientTask {
    max_retries: usize,
}

#[async_trait]
impl Task for ResilientTask {
    fn id(&self) -> &str {
        "resilient_task"
    }

    async fn run(&self, context: Context) -> Result<TaskResult> {
        let retry_count: usize = context.get("retry_count").await.unwrap_or(0);
        
        // Simulate work that might fail
        let success = retry_count > 2; // Succeed after 3 attempts
        
        if success {
            context.set("retry_count", 0).await; // Reset for next time
            Ok(TaskResult::new(
                Some("Task completed successfully".to_string()),
                NextAction::Continue
            ))
        } else if retry_count < self.max_retries {
            context.set("retry_count", retry_count + 1).await;
            Ok(TaskResult::new_with_status(
                Some(format!("Attempt {} failed, retrying...", retry_count + 1)),
                NextAction::GoTo("resilient_task".to_string()), // Retry self
                Some(format!("Retry {}/{}", retry_count + 1, self.max_retries))
            ))
        } else {
            Ok(TaskResult::new(
                Some("Task failed after maximum retries".to_string()),
                NextAction::GoTo("error_handler".to_string())
            ))
        }
    }
}

Dynamic Task Selection

struct RouterTask;

#[async_trait]
impl Task for RouterTask {
    fn id(&self) -> &str {
        "router"
    }

    async fn run(&self, context: Context) -> Result<TaskResult> {
        let user_type: String = context.get("user_type").await.unwrap_or_default();
        let urgency: String = context.get("urgency").await.unwrap_or_default();
        
        let next_task = match (user_type.as_str(), urgency.as_str()) {
            ("premium", "high") => "priority_handler",
            ("premium", _) => "premium_handler",
            (_, "high") => "urgent_handler",
            _ => "standard_handler",
        };
        
        Ok(TaskResult::new(
            Some(format!("Routing to {}", next_task)),
            NextAction::GoTo(next_task.to_string())
        ))
    }
}

Performance and Best Practices

Efficient Context Usage

// ✅ Good: Batch context operations
context.set("key1", value1).await;
context.set("key2", value2).await;
context.set("key3", value3).await;

// ✅ Good: Use sync methods in edge conditions  
.add_conditional_edge(
    "task1",
    |ctx| ctx.get_sync::<bool>("condition").unwrap_or(false),
    "task2",
    "task3"
)

// ✅ Good: Limit chat history size for long conversations
let context = Context::with_max_chat_messages(100);

Memory Management

// ✅ Good: Reuse Arc references
let shared_task = Arc::new(MyTask::new());
let graph = GraphBuilder::new("workflow")
    .add_task(shared_task.clone())  // Clone the Arc, not the task
    .build();

// ✅ Good: Clear unused context data
context.remove("large_temporary_data").await;

Error Handling

// ✅ Good: Proper error propagation
async fn run(&self, context: Context) -> Result<TaskResult> {
    let data = context.get("required_data").await
        .ok_or_else(|| GraphError::TaskExecutionFailed(
            "Missing required data".to_string()
        ))?;
    
    // Process data...
    Ok(TaskResult::new(Some("Success".to_string()), NextAction::Continue))
}

Features

Default Features

The crate works out of the box with basic workflow capabilities.

rig Feature

Enables LLM integration through the Rig crate:

[dependencies]
graph-flow = { version = "0.2", features = ["rig"] }
#[cfg(feature = "rig")]
{
    // Get messages formatted for LLM consumption
    let messages = context.get_rig_messages().await;
    let recent = context.get_last_rig_messages(10).await;
}

Testing Your Workflows

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_workflow() {
        let task = Arc::new(MyTask::new());
        let graph = Arc::new(
            GraphBuilder::new("test")
                .add_task(task.clone())
                .build()
        );
        
        let storage = Arc::new(InMemorySessionStorage::new());
        let runner = FlowRunner::new(graph, storage.clone());
        
        // Create test session
        let session = Session::new_from_task("test_session".to_string(), task.id());
        session.context.set("test_input", "test_value").await;
        storage.save(session).await.unwrap();
        
        // Execute and verify
        let result = runner.run("test_session").await.unwrap();
        assert!(result.response.is_some());
        
        // Check context was updated
        let session = storage.get("test_session").await.unwrap().unwrap();
        let output: String = session.context.get("expected_output").await.unwrap();
        assert_eq!(output, "expected_value");
    }
}

Migration from 0.1.x

  • Context::get_rig_messages() replaces manual message conversion
  • TaskResult::new_with_status() adds debugging support
  • FlowRunner provides simplified session management
  • PostgreSQL storage is now more robust with connection pooling

Project Structure

This section describes the purpose and contents of each file in the graph-flow crate:

Source Files (src/)

lib.rs

The main library entry point that:

  • Defines the crate's public API and exports commonly used types
  • Contains comprehensive module-level documentation with examples
  • Provides a complete Quick Start guide demonstrating the basic workflow
  • Includes integration tests for graph execution and storage functionality

Public re-exports:

  • Context, ChatHistory, MessageRole, SerializableMessage
  • GraphError, Result
  • ExecutionResult, ExecutionStatus, Graph, GraphBuilder
  • FlowRunner
  • GraphStorage, InMemoryGraphStorage, InMemorySessionStorage, Session, SessionStorage
  • PostgresSessionStorage
  • NextAction, Task, TaskResult

context.rs

Context and state management for workflows:

  • Provides both async and sync accessor methods for different use cases
  • Optional Rig integration for LLM message format conversion (behind rig feature flag)
  • Full serialization/deserialization support for persistence

Public types:

  • Context: Thread-safe state container using Arc<DashMap> for data storage
  • ChatHistory: Specialized container for conversation management with automatic message pruning
  • SerializableMessage: Unified message format with role-based typing (User/Assistant/System)
  • MessageRole: Enum defining message sender types (User, Assistant, System)

error.rs

Centralized error handling:

  • Includes variants for task execution, storage, session management, and validation errors
  • Uses thiserror for ergonomic error handling with descriptive messages

Public types:

  • GraphError: Comprehensive error enum with variants:
    • TaskExecutionFailed(String)
    • GraphNotFound(String)
    • InvalidEdge(String)
    • TaskNotFound(String)
    • ContextError(String)
    • StorageError(String)
    • SessionNotFound(String)
    • Other(anyhow::Error)
  • Result<T>: Type alias for std::result::Result<T, GraphError>

graph.rs

Core graph execution engine:

  • Supports conditional branching, task timeouts, and recursive execution
  • Session-aware execution that preserves state between calls
  • Automatic task validation and orphaned task detection

Public types:

  • Graph: Main workflow orchestrator with task execution and flow control
  • GraphBuilder: Fluent API for constructing workflows with validation
  • Edge: Represents connections between tasks with optional condition functions
  • ExecutionResult: Contains response and execution status
  • ExecutionStatus: Enum indicating workflow state:
    • Paused { next_task_id: String }
    • WaitingForInput
    • Completed
    • Error(String)
  • EdgeCondition: Type alias for condition functions

runner.rs

High-level workflow execution wrapper:

  • Designed for interactive applications and web services
  • Handles session persistence automatically
  • Optimized for step-by-step execution with minimal overhead
  • Extensive documentation with usage patterns for different architectures
  • Error handling with automatic session rollback on failures

Public types:

  • FlowRunner: Convenience wrapper implementing the load → execute → save pattern

storage.rs

Session and graph persistence abstractions:

  • Thread-safe implementations using Arc<DashMap> for concurrent access

Public types:

  • Session: Workflow state container with id, current task, and context
  • SessionStorage trait: Abstract interface for session persistence
  • GraphStorage trait: Abstract interface for graph persistence
  • InMemorySessionStorage: Fast in-memory implementation for development/testing
  • InMemoryGraphStorage: In-memory graph storage for development

storage_postgres.rs

Production-ready PostgreSQL storage backend:

  • Automatic database migration with proper schema creation
  • Connection pooling for high-performance concurrent access
  • JSONB storage for efficient context serialization
  • Optimistic concurrency control with timestamp-based conflict resolution
  • Comprehensive error handling with database-specific error mapping

Public types:

  • PostgresSessionStorage: Robust PostgreSQL implementation of SessionStorage

task.rs

Task definition and execution control:

  • Supports both simple and complex task implementations
  • Automatic task ID generation using type names with override capability
  • Extensive examples showing different task patterns and use cases

Public types:

  • Task trait: Core interface that all workflow steps must implement
  • TaskResult: Return type containing response and flow control information
  • NextAction: Enum controlling workflow progression:
    • Continue - Step-by-step execution
    • ContinueAndExecute - Continuous execution
    • GoTo(String) - Jump to specific task
    • GoBack - Go to previous task
    • End - Terminate workflow
    • WaitForInput - Pause for user input

Configuration Files

Cargo.toml

Package configuration defining:

  • Crate metadata (name, version, description, authors)
  • Dependencies with feature flags (rig for LLM integration)
  • Feature definitions and optional dependencies
  • Workspace configuration if part of a larger project

README.md

Comprehensive documentation including:

  • Feature overview and quick start guide
  • Complete API reference with examples
  • Advanced usage patterns and best practices
  • Performance optimization guidelines
  • Migration guides and troubleshooting information

Each file is designed with a single responsibility and clear interfaces, making the codebase maintainable and extensible. The modular architecture allows users to leverage only the components they need while providing full-featured workflow capabilities out of the box.

License

MIT

Parallel Execution with FanOutTask

FanOutTask is a composite task that runs multiple child tasks concurrently, aggregates their outputs into the shared Context, and then continues the graph. It is the simplest way to add parallelism without changing the graph engine.

Key properties:

  • Children share the same Context (concurrent reads/writes are supported). To avoid key collisions, FanOutTask stores each child’s outputs under a prefixed key by default: fanout.<child_id>.<field>.
  • Children’s NextAction is ignored (they act as units of work). The control flow is decided by the FanOutTask itself, which returns NextAction::Continue by default.
  • If any child fails, the whole FanOutTask fails.

Basic example:

use graph_flow::{GraphBuilder, Task, TaskResult, NextAction, Context, FanOutTask};
use async_trait::async_trait;
use std::sync::Arc;

struct Prepare; struct ChildA; struct ChildB; struct Consume;

#[async_trait]
impl Task for Prepare {
    fn id(&self) -> &str { "prepare" }
    async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
        ctx.set("input", "hello".to_string()).await;
        Ok(TaskResult::new(Some("prepared".to_string()), NextAction::Continue))
    }
}

#[async_trait]
impl Task for ChildA {
    fn id(&self) -> &str { "child_a" }
    async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
        let inp: String = ctx.get("input").await.unwrap_or_default();
        ctx.set("a_out", format!("{}-A", inp)).await;
        Ok(TaskResult::new(Some("A done".to_string()), NextAction::End))
    }
}

#[async_trait]
impl Task for ChildB {
    fn id(&self) -> &str { "child_b" }
    async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
        let inp: String = ctx.get("input").await.unwrap_or_default();
        ctx.set("b_out", format!("{}-B", inp)).await;
        Ok(TaskResult::new(Some("B done".to_string()), NextAction::End))
    }
}

#[async_trait]
impl Task for Consume {
    fn id(&self) -> &str { "consume" }
    async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
        let a_resp: Option<String> = ctx.get("fanout.child_a.response").await;
        let b_resp: Option<String> = ctx.get("fanout.child_b.response").await;
        Ok(TaskResult::new(Some(format!("A={:?}, B={:?}", a_resp, b_resp)), NextAction::End))
    }
}

let prepare: Arc<dyn Task> = Arc::new(Prepare);
let child_a: Arc<dyn Task> = Arc::new(ChildA);
let child_b: Arc<dyn Task> = Arc::new(ChildB);
let fanout = FanOutTask::new("fan", vec![child_a.clone(), child_b.clone()]);
let consume: Arc<dyn Task> = Arc::new(Consume);

let graph = GraphBuilder::new("fanout").add_task(prepare.clone())
    .add_task(fanout.clone())
    .add_task(consume.clone())
    .add_edge(prepare.id(), fanout.id())
    .add_edge(fanout.id(), consume.id())
    .build();

See a runnable example at graph-flow/examples/fanout_basic.rs.

Commit count: 64

cargo fmt