| Crates.io | query-flow-macros |
| lib.rs | query-flow-macros |
| version | 0.16.0 |
| created_at | 2025-12-26 15:48:08.838701+00 |
| updated_at | 2026-01-19 13:05:35.928486+00 |
| description | Procedural macros for query-flow. |
| homepage | https://github.com/ryo33/query-flow |
| repository | https://github.com/ryo33/query-flow |
| max_upload_size | |
| id | 2005880 |
| size | 85,966 |
An ergonomic, runtime-agnostic framework for incremental computation.
[!WARNING] Currently in dogfooding phase with the Eure project's CLI, LSP, and Web Playground.
use query_flow::{query, Db, QueryError, QueryRuntime};
#[query]
fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
Ok(a + b)
}
let runtime = QueryRuntime::new();
let result = runtime.query(Add::new(1, 2)).unwrap();
assert_eq!(*result, 3);
QueryRuntime manages query execution, caching, dependency tracking, and asset resolution.#[query] MacroThe #[query] macro transforms a function into a query struct implementing the Query trait:
use query_flow::{query, Db, QueryError};
// Basic query - generates `Add` struct
#[query]
fn add(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
Ok(a + b)
}
// Query with dependencies
#[query]
fn double_sum(db: &impl Db, a: i32, b: i32) -> Result<i32, QueryError> {
let sum = db.query(Add::new(a, b))?;
Ok(*sum * 2)
}
| Option | Example | Description |
|---|---|---|
keys(...) |
#[query(keys(id))] |
Only specified fields used as cache key |
name = "..." |
#[query(name = "FetchUserById")] |
Custom struct name |
output_eq = fn |
#[query(output_eq = my_eq)] |
Custom equality for early cutoff |
For full control, implement the Query trait directly:
use std::sync::Arc;
use query_flow::{Query, Db, QueryError, Key};
#[derive(Clone)]
struct Add { a: i32, b: i32 }
impl Query for Add {
type CacheKey = (i32, i32);
type Output = i32;
fn cache_key(&self) -> Self::CacheKey {
(self.a, self.b)
}
fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
Ok(Arc::new(self.a + self.b))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
Assets represent external resources (files, network data) that queries can depend on:
use query_flow::{asset_key, AssetKey};
use std::path::PathBuf;
// Using the macro
#[asset_key(asset = String)]
pub struct ConfigFile(pub PathBuf);
#[asset_key(asset = Vec<u8>)]
pub struct BinaryAsset(pub PathBuf);
// With selective key fields (only `path` used for Hash/Eq)
#[asset_key(asset = String, key(path))]
pub struct CountedAsset {
path: String,
call_count: Arc<AtomicU32>, // Not part of key
}
// Manual implementation
pub struct TextureId(pub u32);
impl AssetKey for TextureId {
type Asset = ImageData;
fn asset_eq(old: &Self::Asset, new: &Self::Asset) -> bool {
old.bytes == new.bytes
}
}
#[query]
fn process_config(db: &impl Db, path: PathBuf) -> Result<Config, QueryError> {
// Get asset - suspends automatically if not ready
let content = db.asset(ConfigFile(path.clone()))?;
// Parse and return
Ok(parse_config(&content))
}
Locators are optional. Without a locator, assets always return Pending and must be resolved externally via resolve_asset() or resolve_asset_error().
Register a locator when you need:
Ready for assets available synchronouslydb.query() to determine loading behavior dynamicallyuse query_flow::{asset_locator, Db, LocateResult, QueryError, DurabilityLevel};
#[asset_locator]
fn config_locator(db: &impl Db, key: &ConfigFile) -> Result<LocateResult<String>, QueryError> {
// Validation: reject disallowed paths
let config = db.query(GetConfig)?;
if !config.allowed_paths.contains(&key.0) {
return Err(anyhow::anyhow!("Path not allowed").into());
}
// Immediate resolution for bundled files
if let Some(content) = BUNDLED_FILES.get(&key.0) {
return Ok(LocateResult::Ready {
value: content.clone(),
durability: DurabilityLevel::Static,
});
}
// Otherwise, defer to external loading
Ok(LocateResult::Pending)
}
runtime.register_asset_locator(ConfigLocator);
The #[asset_locator] macro generates a struct (PascalCase of function name) implementing AssetLocator.
Durability is specified when resolving assets and helps optimize invalidation propagation:
| Level | Description |
|---|---|
Volatile |
Changes frequently (user input, live feeds) |
Transient |
Changes occasionally (configuration, session data) |
Stable |
Changes rarely (external dependencies) |
Static |
Fixed for this session (bundled assets, constants) |
runtime.resolve_asset(ConfigFile(path), content, DurabilityLevel::Volatile);
runtime.resolve_asset(BundledAsset(name), data, DurabilityLevel::Static);
// File was modified externally
runtime.invalidate_asset(&ConfigFile(path));
// Dependent queries will now suspend until resolved
// Remove asset entirely
runtime.remove_asset(&ConfigFile(path));
The suspense pattern allows sync query code to handle async operations. db.asset() returns the asset value directly, suspending automatically if not ready.
Use db.asset() to get an asset. It automatically returns Err(QueryError::Suspend) if loading.
#[query]
fn process_config(db: &impl Db, path: ConfigFile) -> Result<Config, QueryError> {
let content = db.asset(path)?; // Returns Err(Suspend) if loading
Ok(parse_config(&content))
}
When a query suspends, the runtime tracks which assets are pending. In your event loop, resolve assets when they become available:
// You can check what's pending:
for pending in runtime.pending_assets_of::<ConfigFile>() {
start_loading(&pending.0);
}
// In your event loop, when file content is loaded:
runtime.resolve_asset(ConfigFile(path), content, DurabilityLevel::Volatile);
// Or if loading failed:
runtime.resolve_asset_error(ConfigFile(path), io_error, DurabilityLevel::Volatile);
// Then retry the query
let result = runtime.query(ProcessConfig::new(path))?;
Use db.asset_state() to get an AssetLoadingState for explicit loading state handling:
#[query]
fn eval_expr(db: &impl Db, name: String) -> Result<i64, QueryError> {
// Return 0 while loading, use actual value when ready
let value = db.asset_state(Variable(name))?
.into_inner()
.map(|v| *v)
.unwrap_or(0);
Ok(value)
}
Queries return Result<Arc<Output>, QueryError>. The error variants are:
System errors (not cached):
Suspend - An asset is not yet available. See Suspense Pattern.Cycle - A dependency cycle was detected in the query graph.Cancelled - Query explicitly returned cancellation (not cached, unlike UserError).DependenciesRemoved - Dependencies were removed by another thread during execution.InconsistentAssetResolution - An asset was resolved during query execution, possibly causing inconsistent state.User errors (cached like successful results):
UserError(Arc<anyhow::Error>) - Domain errors from your query logic, automatically converted via ? operator.// User errors with ? operator - errors are automatically converted
#[query]
fn parse_int(db: &impl Db, input: String) -> Result<i32, QueryError> {
let num: i32 = input.parse()?; // ParseIntError -> QueryError::UserError
Ok(num)
}
// System errors propagate automatically
#[query]
fn process(db: &impl Db, id: u64) -> Result<Output, QueryError> {
let data = db.query(FetchData::new(id))?; // Propagates Suspend, Cycle, UserError, etc.
Ok(transform(*data))
}
Use downcast_err() to handle specific user error types while propagating others:
use query_flow::QueryResultExt;
let result = db.query(MyQuery::new()).downcast_err::<MyError>()?;
match result {
Ok(value) => { /* success */ }
Err(my_err) => {
// my_err derefs to &MyError
println!("Error code: {}", my_err.code);
}
}
By default, all UserError values are considered different (conservative). Use QueryRuntimeBuilder to customize:
let runtime = QueryRuntime::builder()
.error_comparator(|a, b| {
// Treat errors as equal if they have the same message
a.to_string() == b.to_string()
})
.build();
Use runtime.poll() to track query changes with revision numbers. This is useful for push-based notifications (e.g., LSP diagnostics).
struct Subscription<Q: Query> {
query: Q,
last_revision: RevisionCounter,
}
// Poll and return only when changed
fn poll_subscription<Q: Query>(
db: &impl Db,
sub: &mut Subscription<Q>,
) -> Result<Option<Arc<Q::Output>>, QueryError> {
let polled = db.poll(sub.query.clone())?;
if polled.revision != sub.last_revision {
sub.last_revision = polled.revision;
Ok(Some(polled.value?))
} else {
Ok(None)
}
}
let runtime = QueryRuntime::new();
// Execute queries
let result = runtime.query(MyQuery::new(...))?;
// Invalidation
runtime.invalidate::<MyQuery>(&cache_key);
runtime.clear_cache();
// Asset management
runtime.register_asset_locator(locator);
runtime.resolve_asset(key, value, DurabilityLevel::Volatile);
runtime.resolve_asset_error(key, error, DurabilityLevel::Volatile);
runtime.invalidate_asset(&key);
runtime.remove_asset(&key);
// Pending assets
runtime.pending_assets(); // All pending
runtime.pending_assets_of::<K>(); // Filtered by type
runtime.has_pending_assets();
| Crate | Description |
|---|---|
query-flow |
High-level query framework with automatic caching and dependency tracking |
query-flow-macros |
Procedural macros for defining queries |
query-flow-inspector |
Debugging and inspection tools |
whale |
Low-level lock-free dependency-tracking primitive |
Whale is the low-level primitive that powers query-flow. It provides lock-free dependency tracking without opinions about what queries are or how to store their results.
Use query-flow if you want a batteries-included incremental computation framework. Use whale directly if you need:
Whale is designed to be a minimal primitive for building high-level incremental computing systems. It does not provide:
Whale is built around a lock-free dependency graph where nodes represent computations and edges represent their dependencies.
Core Components:
Lock-free Design:
The system uses atomic operations and immutable data structures:
This allows multiple threads to concurrently query states, propagate invalidations, and modify the dependency graph.
Consistency Guarantees:
Licensed under either of
at your option.