| Crates.io | whale |
| lib.rs | whale |
| version | 0.5.0 |
| created_at | 2025-02-19 15:07:53.158839+00 |
| updated_at | 2026-01-03 10:36:24.452179+00 |
| description | A lock-free, dependency-tracking primitive for incremental computation. |
| homepage | https://github.com/ryo33/query-flow |
| repository | https://github.com/ryo33/query-flow |
| max_upload_size | |
| id | 1561509 |
| size | 76,546 |
A high-level query framework for incremental computation in Rust.
[!WARNING] This is WIP
LoadingState without coloring functionsuse query_flow::{query, QueryContext, QueryError, QueryRuntime};
#[query]
fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
Ok(a + b)
}
let runtime = QueryRuntime::new();
let result = runtime.query(Add::new(1, 2)).unwrap();
assert_eq!(*result, 3);
#[query] MacroThe #[query] macro transforms a function into a query struct implementing the Query trait:
use query_flow::{query, QueryContext, QueryError};
// Basic query - generates `Add` struct
#[query]
fn add(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
Ok(a + b)
}
// Query with dependencies
#[query]
fn double_sum(ctx: &mut QueryContext, a: i32, b: i32) -> Result<i32, QueryError> {
let sum = ctx.query(Add::new(*a, *b))?;
Ok(*sum * 2)
}
// Selective cache keys - only `id` is part of the key
#[query(keys(id))]
fn fetch_user(db: &impl Db, id: u64, include_deleted: bool) -> Result<User, QueryError> {
// Queries with same `id` share cache, regardless of `include_deleted`
}
// Custom struct name
#[query(name = "FetchUserById")]
fn fetch_user(db: &impl Db, id: u64) -> Result<User, QueryError> { ... }
// Custom output equality (for types without PartialEq)
#[query(output_eq = my_custom_eq)]
fn complex_query(db: &impl Db) -> Result<ComplexType, QueryError> { ... }
For full control, implement the Query trait directly:
use query_flow::{Query, QueryContext, QueryError, Key};
struct Add { a: i32, b: i32 }
impl Query for Add {
type CacheKey = (i32, i32);
type Output = i32;
fn cache_key(&self) -> Self::CacheKey {
(self.a, self.b)
}
fn query(&self, _ctx: &mut QueryContext) -> Result<Self::Output, QueryError> {
Ok(self.a + self.b)
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
query-flow supports both system errors and user errors through QueryError:
Suspend, Cycle, Cancelled, MissingDependencyUserError(Arc<anyhow::Error>) - cached like successful results// User errors with ? operator - errors are automatically converted
#[query]
fn parse_int(ctx: &mut QueryContext, input: String) -> Result<i32, QueryError> {
let num: i32 = input.parse()?; // ParseIntError -> QueryError::UserError
Ok(num)
}
// System errors propagate automatically
#[query]
fn process(ctx: &mut QueryContext, id: u64) -> Result<Output, QueryError> {
let data = ctx.query(FetchData::new(id))?; // Propagates Suspend, Cycle, UserError, etc.
Ok(transform(*data))
}
By default, all UserError values are considered different (conservative). Use QueryRuntimeBuilder to customize:
let runtime = QueryRuntime::builder()
.error_comparator(|a, b| {
// Treat errors as equal if they have the same message
a.to_string() == b.to_string()
})
.build();
Assets represent external resources (files, network data) that queries can depend on:
use query_flow::{asset_key, AssetKey};
use std::path::PathBuf;
// Using the macro
#[asset_key(asset = String)]
pub struct ConfigFile(pub PathBuf);
#[asset_key(asset = Vec<u8>)]
pub struct BinaryAsset(pub PathBuf);
// Manual implementation
pub struct TextureId(pub u32);
impl AssetKey for TextureId {
type Asset = ImageData;
fn asset_eq(old: &Self::Asset, new: &Self::Asset) -> bool {
old.bytes == new.bytes
}
}
#[query]
fn process_config(ctx: &mut QueryContext, path: PathBuf) -> Result<Config, QueryError> {
// Get asset - returns LoadingState<Arc<String>>
let content = ctx.asset(&ConfigFile(path.clone()))?;
// Suspend if not ready (propagates to caller)
let content = content.suspend()?;
// Parse and return
Ok(parse_config(&content))
}
use query_flow::DurabilityLevel;
let runtime = QueryRuntime::new();
// Optional: Register a locator for immediate resolution
runtime.register_asset_locator(MyFileLocator::new());
// Execute query - may return Err(Suspend) if assets are loading
match runtime.query(ProcessConfig::new(path)) {
Ok(result) => println!("Done: {:?}", result),
Err(QueryError::Suspend) => {
// Handle pending assets
for pending in runtime.pending_assets() {
if let Some(path) = pending.key::<ConfigFile>() {
let content = std::fs::read_to_string(&path.0)?;
// Durability is specified at resolve time
runtime.resolve_asset(path.clone(), content, DurabilityLevel::Volatile);
}
}
// Retry query
let result = runtime.query(ProcessConfig::new(path))?;
}
Err(e) => return Err(e),
}
// File was modified externally
runtime.invalidate_asset(&ConfigFile(path));
// Dependent queries will now suspend until resolved
// Remove asset entirely
runtime.remove_asset(&ConfigFile(path));
The suspense pattern allows sync query code to handle async operations:
/// LoadingState<T> represents async loading state
pub enum LoadingState<T> {
Loading,
Ready(T),
}
impl<T> LoadingState<T> {
/// Convert to Result - Loading becomes Err(Suspend)
pub fn suspend(self) -> Result<T, QueryError>;
pub fn is_loading(&self) -> bool;
pub fn is_ready(&self) -> bool;
pub fn get(&self) -> Option<&T>;
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> LoadingState<U>;
}
Durability is specified when resolving assets and helps optimize invalidation propagation:
| Level | Value | Description |
|---|---|---|
Volatile |
0 | Changes frequently (user input, live feeds) |
Transient |
1 | Changes occasionally (configuration, session data) |
Stable |
2 | Changes rarely (external dependencies) |
Static |
3 | Fixed for this session (bundled assets, constants) |
// Durability is specified at resolve_asset time
runtime.resolve_asset(ConfigFile(path), content, DurabilityLevel::Volatile);
runtime.resolve_asset(BundledAsset(name), data, DurabilityLevel::Static);
let runtime = QueryRuntime::new();
// Execute queries
let result = runtime.query(MyQuery::new(...))?;
// Invalidation
runtime.invalidate::<MyQuery>(&cache_key);
runtime.clear_cache();
// Asset management
runtime.register_asset_locator(locator);
runtime.resolve_asset(key, value, DurabilityLevel::Volatile);
runtime.invalidate_asset(&key);
runtime.remove_asset(&key);
// Pending assets
runtime.pending_assets(); // All pending
runtime.pending_assets_of::<K>(); // Filtered by type
runtime.has_pending_assets();
| Crate | Description |
|---|---|
query-flow |
High-level query framework with automatic caching and dependency tracking |
query-flow-macros |
Procedural macros for defining queries |
query-flow-inspector |
Debugging and inspection tools |
whale |
Low-level lock-free dependency-tracking primitive |
Whale is the low-level primitive that powers query-flow. It provides lock-free dependency tracking without opinions about what queries are or how to store their results.
Use query-flow if you want a batteries-included incremental computation framework. Use whale directly if you need:
Whale is designed to be a minimal primitive for building high-level incremental computing systems. It does not provide:
Whale is built around a lock-free dependency graph where nodes represent computations and edges represent their dependencies.
Core Components:
Lock-free Design:
The system uses atomic operations and immutable data structures:
This allows multiple threads to concurrently query states, propagate invalidations, and modify the dependency graph.
Consistency Guarantees:
Licensed under either of
at your option.