| Crates.io | tower-llm |
| lib.rs | tower-llm |
| version | 0.0.13 |
| created_at | 2025-08-20 21:23:56.787553+00 |
| updated_at | 2025-09-08 21:38:20.487731+00 |
| description | A Tower-based framework for building LLM & agent workflows in Rust. |
| homepage | |
| repository | https://github.com/kofron/tower-llm |
| max_upload_size | |
| id | 1803972 |
| size | 915,265 |
This crate implements LLM agents as a set of composable primitives, built on top of tower services and layers. The core idea is that by imagining LLMs and the tools they call as tower services, we can compose them together easily, but also modify their behavior via layers (middleware) to build up almost arbitrarily complex behaviors.
Step<S>: one non-streaming LLM call plus tool execution routed through S: Service<ToolInvocation>AgentLoop: runs steps until a policy decides to stopAgentBuilder: ergonomic builder for assembling tools, model, policy, sessionssessions::MemoryLayer with LoadSession/SaveSession services (e.g., InMemorySessionStore, SqliteSessionStore)observability::{TracingLayer, MetricsLayer}approvals::{ModelApprovalLayer, ToolApprovalLayer}resilience (retry, timeout, circuit-breaker)recording::{RecorderLayer, ReplayService}groups::{HandoffPolicy, HandoffCoordinator} and AgentBuilder::handoff_policy(...)I have used this crate in some software systems that I've built, and it works and is reliable. With that said, it's still a bit of an experiment, so YMMV!
Feel free to open PRs. In the future I'd like to make this more of an ecosystem in the same way that tower is an ecosystem and leverage more 3rd party contributions. Before doing that I'd like for the core interfaces and design to settle down a bit more.
MIT
use std::sync::Arc;
use async_openai::{config::OpenAIConfig, Client};
use tower_llm::{Agent, policies, tool_typed};
use serde::Deserialize;
#[derive(Deserialize)]
struct EchoArgs { value: String }
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let client = Arc::new(Client::<OpenAIConfig>::new());
let echo = tool_typed("echo", "Echo back the value", |args: EchoArgs| async move {
Ok::<_, tower::BoxError>(serde_json::json!({"echo": args.value}))
});
let mut agent = Agent::builder(client)
.model("gpt-4o")
.tool(echo)
.policy(policies::max_steps(1).into())
.build();
// Use run(system, user) convenience or call service with your own request
let _ = tower_llm::run(&mut agent, "You are helpful", "Say hi").await?;
Ok(())
}
At its core, tower-llm treats agents, tools, and coordinators as Tower services you compose with layers. This keeps business logic pure and testable while pushing side effects (I/O, retries, tracing, storage) to the edges.
Why this design works well:
Start small, add power as you go:
Hello world agent: see Quickstart. One model, one tool, a simple stop policy.
Keep state between turns: add Sessions with MemoryLayer and an in-memory or SQLite store.
See what's happening: add Observability via TracingLayer and MetricsLayer to emit spans and metrics.
Bound cost and behavior: compose a Budget policy with your stop policies.
Be resilient: wrap the agent with resilience layers (retry/timeout/circuit-breaker) from resilience.
Record and reproduce: tap runs with Recording/Replay to debug or build fixtures without model calls.
Speed up tools: enable Parallel tool execution and pick a join policy when multiple tools can run concurrently.
Stream in real time: use streaming::StepStreamService and AgentLoopStreamLayer for token-by-token UIs (see the streaming snippet below).
Go multi-agent: coordinate specialists with Handoffs. Start with explicit or sequential policies; compose them as needed.
Keep context tight: add auto_compaction::AutoCompactionLayer or groups::CompactingHandoffPolicy to trim history during long runs (see examples/handoff_compaction.rs).
Validate conversations: use Conversation validation to assert invariants in tests and examples.
Throughout, you can swap providers or run entirely offline using Provider override.
Sessions: persist and reload history around each call.
sessions::MemoryLayer, with InMemorySessionStore or SqliteSessionStore.Observability: trace spans and emit metrics for every step/agent call.
observability::{TracingLayer, MetricsLayer} with your metrics sink.Approvals: gate model outputs and tool invocations.
approvals::{ModelApprovalLayer, ToolApprovalLayer} plus an Approver service.Resilience: retry, time out, and break circuits around calls.
resilience::{RetryLayer, TimeoutLayer, CircuitBreakerLayer}.Recording/Replay: capture runs, replay deterministically without network.
recording::{RecorderLayer, ReplayService} with a trace store.Budgets: constrain tokens, tools, or steps.
budgets::budget_policy(...), composed in CompositePolicy.Concurrency: execute multiple tools concurrently; preserve output order.
concurrency::ParallelToolRouter; select a join policy.Streaming: emit tokens and tool events incrementally for UIs.
streaming::{StepStreamService, AgentLoopStreamLayer, StreamTapLayer}.Handoffs (multi-agent): coordinate multiple agents at runtime.
groups::{AgentPicker, HandoffPolicy, HandoffCoordinator, GroupBuilder}; see also CompactingHandoffPolicy.Auto compaction: trim conversation history safely.
auto_compaction::{AutoCompactionLayer, CompactionPolicy}; or wrap handoffs with CompactingHandoffPolicy.Provider override: swap the model implementation.
provider::{FixedProvider, SequenceProvider, OpenAIProvider} with the ModelService trait.Attach the system prompt at the agent level so it applies consistently across steps and handoffs.
let mut agent = Agent::builder(client)
.model("gpt-4o")
.instructions("You are a helpful assistant.")
.tool(echo)
.policy(policies::max_steps(1).into())
.build();
// Send only a user message; the agent injects its instructions as a system message
let _ = tower_llm::run_user(&mut agent, "Say hi").await?;
Attach MemoryLayer at build time using store services that implement Service<LoadSession> and Service<SaveSession>.
use std::sync::Arc;
use tower_llm::sessions::{InMemorySessionStore, SessionId};
let load = Arc::new(InMemorySessionStore::default());
let save = Arc::new(InMemorySessionStore::default());
let session = SessionId("my-session".into());
let mut agent = Agent::builder(client)
.model("gpt-4o")
.tool(echo)
.policy(policies::max_steps(1).into())
.build_with_session(load, save, session);
For persistence, use the SQLite-backed store:
use tower_llm::sqlite_session::SqliteSessionStore;
let load = Arc::new(SqliteSessionStore::new("sessions.db").await?);
let save = load.clone();
let mut agent = Agent::builder(client)
.model("gpt-4o")
.tool(echo)
.policy(policies::max_steps(1).into())
.build_with_session(load, save, SessionId("s1".into()));
Wrap the step/agent with layers:
use tower::{ServiceBuilder, Service};
use tower_llm::observability::{TracingLayer, MetricsLayer, MetricRecord};
let metrics_sink = tower::service_fn(|_m: MetricRecord| async move { Ok::<(), tower::BoxError>(()) });
let mut agent = ServiceBuilder::new()
.layer(TracingLayer::default())
.layer(MetricsLayer::new(metrics_sink))
.service(agent);
Budgeting is expressed as a policy and composed with others:
use tower_llm::budgets::{Budget, budget_policy};
use tower_llm::core::{CompositePolicy, policies};
let budget = Budget { max_prompt_tokens: Some(8000), max_tool_invocations: Some(5), ..Default::default() };
let policy = CompositePolicy::new(vec![
policies::until_no_tool_calls(),
budget_policy(budget),
]);
let mut agent = Agent::builder(client).policy(policy).build();
use tower_llm::recording::{InMemoryTraceStore, RecorderLayer, ReplayService, WriteTrace};
use tower::{Service, ServiceExt};
// Record
let writer = InMemoryTraceStore::default();
let mut recorded = RecorderLayer::new(writer.clone(), "trace-1").layer(agent);
let _ = ServiceExt::ready(&mut recorded).await?.call(tower_llm::simple_chat_request("sys","hi")).await?;
// Replay (no model calls)
let mut replay = ReplayService::new(writer, "trace-1", "gpt-4o");
let _out = ServiceExt::ready(&mut replay).await?.call(tower_llm::simple_chat_request("sys","ignored")).await?;
Advertise a handoff policy in the agent builder, or orchestrate multiple agents with HandoffCoordinator.
use tower_llm::groups::{explicit_handoff_to, GroupBuilder};
// Builder-level interception on tools
let mut agent = Agent::builder(client)
.model("gpt-4o")
.handoff_policy(explicit_handoff_to("specialist").into())
.build();
// Group-level coordination
let coordinator = GroupBuilder::new()
.agent("triage", agent)
.agent("specialist", Agent::builder(client.clone()).model("gpt-4o").build())
.picker(tower::service_fn(|_req| async { Ok::<_, tower::BoxError>("triage".to_string()) }))
.handoff_policy(explicit_handoff_to("specialist"))
.build();
cargo testcargo clippy -D warningscargo test --docThis crate includes a pure validation facility to assert that conversations are well-formed for testing and examples.
tower_llm::validation::{validate_conversation, ValidationPolicy}examples/validate_conversation.rs, examples/generate_conversations.rsMinimal usage:
use tower_llm::validation::{validate_conversation, ValidationPolicy};
use async_openai::types::*;
let sys = ChatCompletionRequestSystemMessageArgs::default().content("sys").build().unwrap();
let usr = ChatCompletionRequestUserMessageArgs::default().content("hi").build().unwrap();
let asst = ChatCompletionRequestAssistantMessageArgs::default().content("ok").build().unwrap();
let msgs = vec![sys.into(), usr.into(), asst.into()];
assert!(validate_conversation(&msgs, &ValidationPolicy::default()).is_none());
Generators and mutators are available under tower_llm::validation::gen and tower_llm::validation::mutate for property tests.
You can enable concurrent execution of tool calls within a single step. Order of outputs is preserved.
Non-streaming:
let mut agent = Agent::builder(client)
.model("gpt-4o")
.tools(vec![slow, fast])
.parallel_tools(true)
.tool_concurrency_limit(4) // optional
.tool_join_policy(tower_llm::ToolJoinPolicy::JoinAll) // optional: wait for all tools even if one fails
.policy(CompositePolicy::new(vec![policies::max_steps(1)]))
.build();
Streaming (when constructing the service directly):
use tower_llm::streaming::StepStreamService;
let step_stream = StepStreamService::new(provider, tool_router)
.parallel_tools(true)
.tool_concurrency_limit(4)
.tool_join_policy(tower_llm::ToolJoinPolicy::JoinAll);
You can inject a custom non-streaming provider into AgentBuilder for testing or to adapt other backends. Use with_provider(...) to supply any service that implements the ModelService trait.
use tower_llm::provider::{FixedProvider, ProviderResponse};
use async_openai::types::{ChatCompletionResponseMessage, Role as RespRole};
// Fixed assistant response from your provider
let assistant = ChatCompletionResponseMessage {
content: Some("ok".into()),
role: RespRole::Assistant,
tool_calls: None,
refusal: None,
audio: None,
};
let provider = FixedProvider::new(ProviderResponse { assistant, prompt_tokens: 1, completion_tokens: 1 });
let mut agent = Agent::builder(client)
.model("gpt-4o")
.with_provider(provider)
.policy(CompositePolicy::new(vec![policies::max_steps(1)]))
.build();
You can wrap the final agent service with arbitrary Tower composition using map_agent_service(...):
use tower::ServiceBuilder;
use tower::timeout::TimeoutLayer;
use std::time::Duration;
let mut agent = Agent::builder(client)
.model("gpt-4o")
.map_agent_service(|svc| ServiceBuilder::new().layer(TimeoutLayer::new(Duration::from_secs(2))).service(svc))
.build();