| Crates.io | graph-flow |
| lib.rs | graph-flow |
| version | 0.4.0 |
| created_at | 2025-06-29 06:03:26.825203+00 |
| updated_at | 2025-09-11 12:19:05.987324+00 |
| description | A high-performance, type-safe framework for building multi-agent workflow systems in Rust |
| homepage | https://github.com/a-agmon/rs-graph-llm |
| repository | https://github.com/a-agmon/rs-graph-llm |
| max_upload_size | |
| id | 1730410 |
| size | 207,760 |
A high-performance, type-safe framework for building multi-agent workflow systems in Rust.
Add to your Cargo.toml:
[dependencies]
graph-flow = "0.2"
# For LLM integration
graph-flow = { version = "0.2", features = ["rig"] }
use graph_flow::{Context, Task, TaskResult, NextAction, GraphBuilder, FlowRunner, InMemorySessionStorage, Session};
use async_trait::async_trait;
use std::sync::Arc;
// Define a simple greeting task
struct HelloTask;
#[async_trait]
impl Task for HelloTask {
fn id(&self) -> &str {
"hello_task"
}
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let name: String = context.get("name").await.unwrap_or("World".to_string());
let greeting = format!("Hello, {}! How are you today?", name);
// Store the greeting in context for other tasks
context.set("greeting", greeting.clone()).await;
Ok(TaskResult::new(Some(greeting), NextAction::Continue))
}
}
#[tokio::main]
async fn main() -> graph_flow::Result<()> {
// Build the graph
let hello_task = Arc::new(HelloTask);
let graph = Arc::new(
GraphBuilder::new("greeting_workflow")
.add_task(hello_task.clone())
.build()
);
// Set up session storage and runner
let session_storage = Arc::new(InMemorySessionStorage::new());
let flow_runner = FlowRunner::new(graph.clone(), session_storage.clone());
// Create a session with initial data
let session = Session::new_from_task("user_123".to_string(), hello_task.id());
session.context.set("name", "Alice".to_string()).await;
session_storage.save(session).await?;
// Execute the workflow
let result = flow_runner.run("user_123").await?;
println!("Response: {:?}", result.response);
Ok(())
}
Tasks implement the core Task trait and define the units of work in your workflow:
use graph_flow::{Task, TaskResult, NextAction, Context};
use async_trait::async_trait;
struct DataProcessingTask {
name: String,
}
#[async_trait]
impl Task for DataProcessingTask {
fn id(&self) -> &str {
&self.name
}
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
// Get input data from context
let input: String = context.get("user_input").await.unwrap_or_default();
// Process the data
let processed = format!("Processed: {}", input.to_uppercase());
// Store result for next task
context.set("processed_data", processed.clone()).await;
// Return result with next action
Ok(TaskResult::new(
Some(format!("Data processed: {}", processed)),
NextAction::Continue
))
}
}
struct ValidationTask;
#[async_trait]
impl Task for ValidationTask {
fn id(&self) -> &str {
"validator"
}
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let data: Option<String> = context.get("processed_data").await;
match data {
Some(data) if data.len() > 10 => {
Ok(TaskResult::new_with_status(
Some("Data validation passed".to_string()),
NextAction::Continue,
Some("Data meets minimum length requirement".to_string())
))
}
Some(_) => {
Ok(TaskResult::new_with_status(
Some("Data validation failed - too short".to_string()),
NextAction::WaitForInput,
Some("Waiting for user to provide more data".to_string())
))
}
None => {
Ok(TaskResult::new_with_status(
Some("No data found to validate".to_string()),
NextAction::GoTo("data_input".to_string()),
Some("Redirecting to data input task".to_string())
))
}
}
}
}
The NextAction enum controls how your workflow progresses:
// Continue to next task, but pause execution (step-by-step mode)
Ok(TaskResult::new(Some("Step completed".to_string()), NextAction::Continue))
// Continue and execute the next task immediately (continuous mode)
Ok(TaskResult::new(Some("Moving forward".to_string()), NextAction::ContinueAndExecute))
// Wait for user input before continuing
Ok(TaskResult::new(Some("Need more info".to_string()), NextAction::WaitForInput))
// Jump to a specific task
Ok(TaskResult::new(Some("Redirecting".to_string()), NextAction::GoTo("specific_task".to_string())))
// Go back to the previous task
Ok(TaskResult::new(Some("Going back".to_string()), NextAction::GoBack))
// End the workflow
Ok(TaskResult::new(Some("All done!".to_string()), NextAction::End))
// Convenience methods
TaskResult::move_to_next() // NextAction::Continue
TaskResult::move_to_next_direct() // NextAction::ContinueAndExecute
The Context provides thread-safe state sharing across tasks:
// Setting values
context.set("key", "value").await;
context.set("number", 42).await;
context.set("complex_data", MyStruct { field: "value" }).await;
// Getting values
let value: Option<String> = context.get("key").await;
let number: Option<i32> = context.get("number").await;
let complex: Option<MyStruct> = context.get("complex_data").await;
// Synchronous operations (useful in edge conditions)
context.set_sync("sync_key", "sync_value");
let sync_value: Option<String> = context.get_sync("sync_key");
// Removing values
let removed: Option<serde_json::Value> = context.remove("key").await;
// Clearing all data (preserves chat history)
context.clear().await;
// Adding messages
context.add_user_message("Hello, assistant!".to_string()).await;
context.add_assistant_message("Hello! How can I help you?".to_string()).await;
context.add_system_message("System: Session started".to_string()).await;
// Getting chat history
let history = context.get_chat_history().await;
let all_messages = context.get_all_messages().await;
let last_5 = context.get_last_messages(5).await;
// Chat history info
let count = context.chat_history_len().await;
let is_empty = context.is_chat_history_empty().await;
// Clear chat history
context.clear_chat_history().await;
// Context with message limits
let context = Context::with_max_chat_messages(100);
rig feature)#[cfg(feature = "rig")]
{
// Get messages in rig format for LLM calls
let rig_messages = context.get_rig_messages().await;
let last_10_for_llm = context.get_last_rig_messages(10).await;
// Use with rig's completion API
// let response = agent.completion(&rig_messages).await?;
}
Create complex workflows using the GraphBuilder:
let graph = GraphBuilder::new("linear_workflow")
.add_task(task1.clone())
.add_task(task2.clone())
.add_task(task3.clone())
.add_edge(task1.id(), task2.id()) // task1 -> task2
.add_edge(task2.id(), task3.id()) // task2 -> task3
.build();
let graph = GraphBuilder::new("conditional_workflow")
.add_task(input_task.clone())
.add_task(process_a.clone())
.add_task(process_b.clone())
.add_task(final_task.clone())
.add_conditional_edge(
input_task.id(),
|ctx| ctx.get_sync::<String>("user_type").unwrap_or_default() == "premium",
process_a.id(), // if premium user
process_b.id(), // if regular user
)
.add_edge(process_a.id(), final_task.id())
.add_edge(process_b.id(), final_task.id())
.build();
let graph = GraphBuilder::new("complex_workflow")
.add_task(start_task.clone())
.add_task(validation_task.clone())
.add_task(processing_task.clone())
.add_task(error_handler.clone())
.add_task(success_task.clone())
.add_task(retry_task.clone())
// Initial flow
.add_edge(start_task.id(), validation_task.id())
// Validation branches
.add_conditional_edge(
validation_task.id(),
|ctx| ctx.get_sync::<bool>("is_valid").unwrap_or(false),
processing_task.id(), // valid -> process
error_handler.id(), // invalid -> error
)
// Processing branches
.add_conditional_edge(
processing_task.id(),
|ctx| ctx.get_sync::<bool>("success").unwrap_or(false),
success_task.id(), // success -> done
retry_task.id(), // failure -> retry
)
// Retry logic
.add_conditional_edge(
retry_task.id(),
|ctx| ctx.get_sync::<i32>("retry_count").unwrap_or(0) < 3,
validation_task.id(), // retry -> validate again
error_handler.id(), // max retries -> error
)
.set_start_task(start_task.id())
.build();
Best for interactive applications where you want control between each step:
let flow_runner = FlowRunner::new(graph, session_storage);
loop {
let result = flow_runner.run(&session_id).await?;
match result.status {
ExecutionStatus::Completed => {
println!("Workflow completed: {:?}", result.response);
break;
}
ExecutionStatus::WaitingForInput => {
println!("Waiting for input: {:?}", result.response);
// Get user input and update context
// context.set("user_input", user_input).await;
continue;
}
ExecutionStatus::Paused { next_task_id } => {
println!("Paused at {}: {:?}", next_task_id, result.response);
// Optionally do something before next step
continue;
}
ExecutionStatus::Error(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
For tasks that should run automatically until completion:
// Tasks use NextAction::ContinueAndExecute
struct AutoTask;
#[async_trait]
impl Task for AutoTask {
fn id(&self) -> &str { "auto_task" }
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
// Do work...
Ok(TaskResult::new(
Some("Work done".to_string()),
NextAction::ContinueAndExecute // Continue automatically
))
}
}
// Single call executes until completion or interruption
let result = flow_runner.run(&session_id).await?;
Combine both patterns in the same workflow:
struct InteractiveTask;
#[async_trait]
impl Task for InteractiveTask {
fn id(&self) -> &str { "interactive" }
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let needs_input: bool = context.get("needs_user_input").await.unwrap_or(false);
if needs_input {
Ok(TaskResult::new(
Some("Please provide input".to_string()),
NextAction::WaitForInput // Stop and wait
))
} else {
Ok(TaskResult::new(
Some("Processing automatically".to_string()),
NextAction::ContinueAndExecute // Continue automatically
))
}
}
}
use graph_flow::InMemorySessionStorage;
let storage = Arc::new(InMemorySessionStorage::new());
// Create and save a session
let session = Session::new_from_task("session_1".to_string(), "start_task");
session.context.set("initial_data", "value").await;
storage.save(session).await?;
// Retrieve and use
let session = storage.get("session_1").await?.unwrap();
let data: String = session.context.get("initial_data").await.unwrap();
use graph_flow::PostgresSessionStorage;
// Connect to database
let storage = Arc::new(
PostgresSessionStorage::connect(&database_url).await?
);
// Works the same as in-memory
let session = Session::new_from_task("session_1".to_string(), "start_task");
storage.save(session).await?;
use graph_flow::*;
struct AgentTask {
agent_name: String,
system_prompt: String,
}
#[async_trait]
impl Task for AgentTask {
fn id(&self) -> &str {
&self.agent_name
}
async fn run(&self, context: Context) -> Result<TaskResult> {
// Get conversation history
let messages = context.get_all_messages().await;
// Add system context if first message from this agent
if messages.is_empty() {
context.add_system_message(self.system_prompt.clone()).await;
}
// Get latest user message
let user_input: Option<String> = context.get("user_input").await;
if let Some(input) = user_input {
context.add_user_message(input).await;
// Here you would integrate with your LLM
let response = format!("[{}] Processed: {}", self.agent_name, "response");
context.add_assistant_message(response.clone()).await;
// Store for next agent or user
context.set("last_agent_response", response.clone()).await;
Ok(TaskResult::new(Some(response), NextAction::Continue))
} else {
Ok(TaskResult::new(
Some("Waiting for user input".to_string()),
NextAction::WaitForInput
))
}
}
}
// Build multi-agent workflow
let analyst = Arc::new(AgentTask {
agent_name: "analyst".to_string(),
system_prompt: "You are a data analyst.".to_string(),
});
let reviewer = Arc::new(AgentTask {
agent_name: "reviewer".to_string(),
system_prompt: "You review and critique analysis.".to_string(),
});
let graph = GraphBuilder::new("multi_agent_chat")
.add_task(analyst.clone())
.add_task(reviewer.clone())
.add_conditional_edge(
analyst.id(),
|ctx| ctx.get_sync::<bool>("needs_review").unwrap_or(true),
reviewer.id(),
analyst.id(), // Loop back for more analysis
)
.build();
struct ResilientTask {
max_retries: usize,
}
#[async_trait]
impl Task for ResilientTask {
fn id(&self) -> &str {
"resilient_task"
}
async fn run(&self, context: Context) -> Result<TaskResult> {
let retry_count: usize = context.get("retry_count").await.unwrap_or(0);
// Simulate work that might fail
let success = retry_count > 2; // Succeed after 3 attempts
if success {
context.set("retry_count", 0).await; // Reset for next time
Ok(TaskResult::new(
Some("Task completed successfully".to_string()),
NextAction::Continue
))
} else if retry_count < self.max_retries {
context.set("retry_count", retry_count + 1).await;
Ok(TaskResult::new_with_status(
Some(format!("Attempt {} failed, retrying...", retry_count + 1)),
NextAction::GoTo("resilient_task".to_string()), // Retry self
Some(format!("Retry {}/{}", retry_count + 1, self.max_retries))
))
} else {
Ok(TaskResult::new(
Some("Task failed after maximum retries".to_string()),
NextAction::GoTo("error_handler".to_string())
))
}
}
}
struct RouterTask;
#[async_trait]
impl Task for RouterTask {
fn id(&self) -> &str {
"router"
}
async fn run(&self, context: Context) -> Result<TaskResult> {
let user_type: String = context.get("user_type").await.unwrap_or_default();
let urgency: String = context.get("urgency").await.unwrap_or_default();
let next_task = match (user_type.as_str(), urgency.as_str()) {
("premium", "high") => "priority_handler",
("premium", _) => "premium_handler",
(_, "high") => "urgent_handler",
_ => "standard_handler",
};
Ok(TaskResult::new(
Some(format!("Routing to {}", next_task)),
NextAction::GoTo(next_task.to_string())
))
}
}
// ✅ Good: Batch context operations
context.set("key1", value1).await;
context.set("key2", value2).await;
context.set("key3", value3).await;
// ✅ Good: Use sync methods in edge conditions
.add_conditional_edge(
"task1",
|ctx| ctx.get_sync::<bool>("condition").unwrap_or(false),
"task2",
"task3"
)
// ✅ Good: Limit chat history size for long conversations
let context = Context::with_max_chat_messages(100);
// ✅ Good: Reuse Arc references
let shared_task = Arc::new(MyTask::new());
let graph = GraphBuilder::new("workflow")
.add_task(shared_task.clone()) // Clone the Arc, not the task
.build();
// ✅ Good: Clear unused context data
context.remove("large_temporary_data").await;
// ✅ Good: Proper error propagation
async fn run(&self, context: Context) -> Result<TaskResult> {
let data = context.get("required_data").await
.ok_or_else(|| GraphError::TaskExecutionFailed(
"Missing required data".to_string()
))?;
// Process data...
Ok(TaskResult::new(Some("Success".to_string()), NextAction::Continue))
}
The crate works out of the box with basic workflow capabilities.
rig FeatureEnables LLM integration through the Rig crate:
[dependencies]
graph-flow = { version = "0.2", features = ["rig"] }
#[cfg(feature = "rig")]
{
// Get messages formatted for LLM consumption
let messages = context.get_rig_messages().await;
let recent = context.get_last_rig_messages(10).await;
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_workflow() {
let task = Arc::new(MyTask::new());
let graph = Arc::new(
GraphBuilder::new("test")
.add_task(task.clone())
.build()
);
let storage = Arc::new(InMemorySessionStorage::new());
let runner = FlowRunner::new(graph, storage.clone());
// Create test session
let session = Session::new_from_task("test_session".to_string(), task.id());
session.context.set("test_input", "test_value").await;
storage.save(session).await.unwrap();
// Execute and verify
let result = runner.run("test_session").await.unwrap();
assert!(result.response.is_some());
// Check context was updated
let session = storage.get("test_session").await.unwrap().unwrap();
let output: String = session.context.get("expected_output").await.unwrap();
assert_eq!(output, "expected_value");
}
}
Context::get_rig_messages() replaces manual message conversionTaskResult::new_with_status() adds debugging supportFlowRunner provides simplified session managementThis section describes the purpose and contents of each file in the graph-flow crate:
src/)lib.rsThe main library entry point that:
Public re-exports:
Context, ChatHistory, MessageRole, SerializableMessageGraphError, ResultExecutionResult, ExecutionStatus, Graph, GraphBuilderFlowRunnerGraphStorage, InMemoryGraphStorage, InMemorySessionStorage, Session, SessionStoragePostgresSessionStorageNextAction, Task, TaskResultcontext.rsContext and state management for workflows:
rig feature flag)Public types:
Context: Thread-safe state container using Arc<DashMap> for data storageChatHistory: Specialized container for conversation management with automatic message pruningSerializableMessage: Unified message format with role-based typing (User/Assistant/System)MessageRole: Enum defining message sender types (User, Assistant, System)error.rsCentralized error handling:
thiserror for ergonomic error handling with descriptive messagesPublic types:
GraphError: Comprehensive error enum with variants:
TaskExecutionFailed(String)GraphNotFound(String)InvalidEdge(String)TaskNotFound(String)ContextError(String)StorageError(String)SessionNotFound(String)Other(anyhow::Error)Result<T>: Type alias for std::result::Result<T, GraphError>graph.rsCore graph execution engine:
Public types:
Graph: Main workflow orchestrator with task execution and flow controlGraphBuilder: Fluent API for constructing workflows with validationEdge: Represents connections between tasks with optional condition functionsExecutionResult: Contains response and execution statusExecutionStatus: Enum indicating workflow state:
Paused { next_task_id: String }WaitingForInputCompletedError(String)EdgeCondition: Type alias for condition functionsrunner.rsHigh-level workflow execution wrapper:
Public types:
FlowRunner: Convenience wrapper implementing the load → execute → save patternstorage.rsSession and graph persistence abstractions:
Arc<DashMap> for concurrent accessPublic types:
Session: Workflow state container with id, current task, and contextSessionStorage trait: Abstract interface for session persistenceGraphStorage trait: Abstract interface for graph persistenceInMemorySessionStorage: Fast in-memory implementation for development/testingInMemoryGraphStorage: In-memory graph storage for developmentstorage_postgres.rsProduction-ready PostgreSQL storage backend:
Public types:
PostgresSessionStorage: Robust PostgreSQL implementation of SessionStoragetask.rsTask definition and execution control:
Public types:
Task trait: Core interface that all workflow steps must implementTaskResult: Return type containing response and flow control informationNextAction: Enum controlling workflow progression:
Continue - Step-by-step executionContinueAndExecute - Continuous executionGoTo(String) - Jump to specific taskGoBack - Go to previous taskEnd - Terminate workflowWaitForInput - Pause for user inputCargo.tomlPackage configuration defining:
rig for LLM integration)README.mdComprehensive documentation including:
Each file is designed with a single responsibility and clear interfaces, making the codebase maintainable and extensible. The modular architecture allows users to leverage only the components they need while providing full-featured workflow capabilities out of the box.
MIT
FanOutTask is a composite task that runs multiple child tasks concurrently, aggregates their outputs into the shared Context, and then continues the graph. It is the simplest way to add parallelism without changing the graph engine.
Key properties:
Context (concurrent reads/writes are supported). To avoid key collisions, FanOutTask stores each child’s outputs under a prefixed key by default: fanout.<child_id>.<field>.NextAction is ignored (they act as units of work). The control flow is decided by the FanOutTask itself, which returns NextAction::Continue by default.FanOutTask fails.Basic example:
use graph_flow::{GraphBuilder, Task, TaskResult, NextAction, Context, FanOutTask};
use async_trait::async_trait;
use std::sync::Arc;
struct Prepare; struct ChildA; struct ChildB; struct Consume;
#[async_trait]
impl Task for Prepare {
fn id(&self) -> &str { "prepare" }
async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
ctx.set("input", "hello".to_string()).await;
Ok(TaskResult::new(Some("prepared".to_string()), NextAction::Continue))
}
}
#[async_trait]
impl Task for ChildA {
fn id(&self) -> &str { "child_a" }
async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
let inp: String = ctx.get("input").await.unwrap_or_default();
ctx.set("a_out", format!("{}-A", inp)).await;
Ok(TaskResult::new(Some("A done".to_string()), NextAction::End))
}
}
#[async_trait]
impl Task for ChildB {
fn id(&self) -> &str { "child_b" }
async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
let inp: String = ctx.get("input").await.unwrap_or_default();
ctx.set("b_out", format!("{}-B", inp)).await;
Ok(TaskResult::new(Some("B done".to_string()), NextAction::End))
}
}
#[async_trait]
impl Task for Consume {
fn id(&self) -> &str { "consume" }
async fn run(&self, ctx: Context) -> graph_flow::Result<TaskResult> {
let a_resp: Option<String> = ctx.get("fanout.child_a.response").await;
let b_resp: Option<String> = ctx.get("fanout.child_b.response").await;
Ok(TaskResult::new(Some(format!("A={:?}, B={:?}", a_resp, b_resp)), NextAction::End))
}
}
let prepare: Arc<dyn Task> = Arc::new(Prepare);
let child_a: Arc<dyn Task> = Arc::new(ChildA);
let child_b: Arc<dyn Task> = Arc::new(ChildB);
let fanout = FanOutTask::new("fan", vec![child_a.clone(), child_b.clone()]);
let consume: Arc<dyn Task> = Arc::new(Consume);
let graph = GraphBuilder::new("fanout").add_task(prepare.clone())
.add_task(fanout.clone())
.add_task(consume.clone())
.add_edge(prepare.id(), fanout.id())
.add_edge(fanout.id(), consume.id())
.build();
See a runnable example at graph-flow/examples/fanout_basic.rs.