| Crates.io | oxirs-arq |
| lib.rs | oxirs-arq |
| version | 0.1.0 |
| created_at | 2025-09-30 09:03:47.79848+00 |
| updated_at | 2026-01-20 21:25:08.929145+00 |
| description | Jena-style SPARQL algebra with extension points and query optimization |
| homepage | https://github.com/cool-japan/oxirs |
| repository | https://github.com/cool-japan/oxirs |
| max_upload_size | |
| id | 1860846 |
| size | 4,239,068 |
SPARQL query engine with algebra and optimization
Status: Production Release (v0.1.0) - Released January 7, 2026
✨ Production Release: Production-ready with API stability guarantees. Semantic versioning enforced.
oxirs-arq is a SPARQL 1.1/1.2 query engine inspired by Apache Jena's ARQ engine. It provides query optimization, extensible algebra operations, and built-in function support.
Add to your Cargo.toml:
[dependencies]
oxirs-arq = "0.1.0"
use oxirs_arq::{QueryEngine, Query};
use oxirs_core::Dataset;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create dataset
let dataset = Dataset::from_file("data.ttl")?;
// Create query engine
let engine = QueryEngine::new();
// Parse and execute query
let query = Query::parse(r#"
SELECT ?person ?name ?age WHERE {
?person a foaf:Person ;
foaf:name ?name ;
foaf:age ?age .
FILTER (?age > 18)
}
ORDER BY ?age
"#)?;
let results = engine.execute(&query, &dataset).await?;
// Process results
for binding in results {
println!("Person: {}, Name: {}, Age: {}",
binding.get("person")?,
binding.get("name")?,
binding.get("age")?);
}
Ok(())
}
use oxirs_arq::{QueryEngine, Function, FunctionRegistry, Value};
// Define custom function
struct DistanceFunction;
impl Function for DistanceFunction {
fn name(&self) -> &str { "distance" }
fn arity(&self) -> usize { 4 } // lat1, lon1, lat2, lon2
async fn call(&self, args: &[Value]) -> Result<Value, String> {
let lat1 = args[0].as_float()?;
let lon1 = args[1].as_float()?;
let lat2 = args[2].as_float()?;
let lon2 = args[3].as_float()?;
let distance = calculate_haversine_distance(lat1, lon1, lat2, lon2);
Ok(Value::Float(distance))
}
}
// Register function
let mut engine = QueryEngine::new();
engine.register_function(DistanceFunction);
// Use in query
let query = Query::parse(r#"
SELECT ?place ?dist WHERE {
?place geo:lat ?lat ; geo:lon ?lon .
BIND(distance(?lat, ?lon, 40.7128, -74.0060) AS ?dist)
FILTER(?dist < 100)
}
"#)?;
use oxirs_arq::{QueryEngine, Optimizer, OptimizationLevel};
let engine = QueryEngine::builder()
.optimization_level(OptimizationLevel::Aggressive)
.enable_statistics(true)
.enable_caching(true)
.parallel_execution(true)
.build();
// Enable specific optimizations
let optimizer = Optimizer::new()
.enable_join_reordering(true)
.enable_filter_pushdown(true)
.enable_projection_pushdown(true)
.enable_constant_folding(true)
.enable_dead_code_elimination(true);
engine.set_optimizer(optimizer);
The engine supports a rich set of algebraic operations:
use oxirs_arq::algebra::{Algebra, BGP, Filter, Join, LeftJoin, Union, Extend};
// Build query algebra programmatically
let bgp1 = BGP::new(vec![
triple_pattern!(?person, rdf:type, foaf:Person),
triple_pattern!(?person, foaf:name, ?name),
]);
let bgp2 = BGP::new(vec![
triple_pattern!(?person, foaf:age, ?age),
]);
let join = Join::new(bgp1, bgp2);
let filter = Filter::new(join, expression!(?age > 18));
let algebra = Algebra::from(filter);
use oxirs_arq::{Operator, ExecutionContext, BindingSet};
struct SampleOperator {
input: Box<dyn Operator>,
sample_rate: f64,
}
#[async_trait]
impl Operator for SampleOperator {
async fn execute(&self, context: &ExecutionContext) -> Result<BindingSet, Error> {
let input_results = self.input.execute(context).await?;
let mut rng = rand::thread_rng();
input_results
.filter(|_| rng.gen::<f64>() < self.sample_rate)
.collect()
}
fn cardinality_estimate(&self) -> usize {
(self.input.cardinality_estimate() as f64 * self.sample_rate) as usize
}
}
use oxirs_arq::{QueryEngine, ParallelConfig};
let engine = QueryEngine::builder()
.parallel_config(ParallelConfig {
max_threads: 8,
work_stealing: true,
chunk_size: 1000,
parallel_threshold: 10000,
})
.build();
// Queries are automatically parallelized based on algebra structure
let query = Query::parse(r#"
SELECT ?s1 ?s2 WHERE {
{
SELECT ?s1 WHERE { ?s1 ?p1 ?o1 }
}
UNION
{
SELECT ?s2 WHERE { ?s2 ?p2 ?o2 }
}
}
"#)?;
use oxirs_arq::{QueryEngine, StreamingConfig};
use futures::stream::StreamExt;
let engine = QueryEngine::builder()
.streaming_config(StreamingConfig {
buffer_size: 1000,
batch_size: 100,
})
.build();
let query = Query::parse("SELECT * WHERE { ?s ?p ?o }")?;
let mut stream = engine.execute_streaming(&query, &dataset).await?;
while let Some(batch) = stream.next().await {
let bindings = batch?;
// Process batch of results
for binding in bindings {
println!("{:?}", binding);
}
}
use oxirs_arq::{QueryEngine, FederationConfig, RemoteEndpoint};
let engine = QueryEngine::builder()
.federation_config(FederationConfig {
timeout: Duration::from_secs(30),
max_concurrent_services: 5,
cache_remote_results: true,
})
.remote_endpoint("dbpedia", RemoteEndpoint::new("https://dbpedia.org/sparql"))
.remote_endpoint("wikidata", RemoteEndpoint::new("https://query.wikidata.org/sparql"))
.build();
let query = Query::parse(r#"
SELECT ?person ?name ?birthPlace WHERE {
?person foaf:name ?name .
SERVICE <https://dbpedia.org/sparql> {
?person dbo:birthPlace ?birthPlace .
}
}
"#)?;
SELECT ?person ?upperName WHERE {
?person foaf:name ?name .
BIND(UCASE(?name) AS ?upperName)
FILTER(CONTAINS(?name, "John"))
}
SELECT ?value ?rounded ?sqrt WHERE {
?item ex:value ?value .
BIND(ROUND(?value) AS ?rounded)
BIND(SQRT(?value) AS ?sqrt)
FILTER(?value > 0)
}
SELECT ?event ?year ?monthName WHERE {
?event ex:date ?date .
BIND(YEAR(?date) AS ?year)
BIND(MONTH(?date) AS ?month)
BIND(IF(?month = 1, "January",
IF(?month = 2, "February", ...)) AS ?monthName)
}
SELECT ?category (COUNT(?item) AS ?count) (AVG(?price) AS ?avgPrice) WHERE {
?item ex:category ?category ;
ex:price ?price .
}
GROUP BY ?category
HAVING (?count > 5)
ORDER BY DESC(?avgPrice)
| Query Type | QPS | Latency (p95) | Memory |
|---|---|---|---|
| Simple BGP | 25,000 | 5ms | 15MB |
| Complex Join | 5,000 | 25ms | 45MB |
| Aggregation | 3,000 | 35ms | 35MB |
| Federation | 1,200 | 150ms | 25MB |
use oxirs_arq::{QueryEngine, Statistics, StatisticsCollector};
let mut engine = QueryEngine::new();
engine.enable_statistics(true);
// Execute queries to collect statistics
for query in queries {
engine.execute(&query, &dataset).await?;
}
// View optimization statistics
let stats = engine.statistics();
println!("Query plans optimized: {}", stats.optimized_plans);
println!("Average speedup: {:.2}x", stats.average_speedup);
println!("Cache hit rate: {:.1}%", stats.cache_hit_rate * 100.0);
query_engine:
optimization:
level: "aggressive"
enable_statistics: true
enable_caching: true
cache_size: 1000
execution:
parallel: true
max_threads: 8
streaming: true
batch_size: 1000
federation:
timeout: "30s"
max_concurrent: 5
cache_results: true
functions:
custom_namespaces:
- "http://example.org/functions/"
enable_extensions: true
use oxirs_arq::{QueryEngine, RuntimeConfig};
let config = RuntimeConfig::builder()
.query_timeout(Duration::from_secs(300))
.memory_limit(1024 * 1024 * 1024) // 1GB
.result_limit(1_000_000)
.parallel_threshold(10_000)
.build();
let engine = QueryEngine::with_config(config);
use oxirs_arq::{QueryError, ExecutionError};
match engine.execute(&query, &dataset).await {
Ok(results) => {
// Handle successful execution
}
Err(QueryError::ParseError(msg)) => {
eprintln!("Query parse error: {}", msg);
}
Err(QueryError::ExecutionError(ExecutionError::Timeout)) => {
eprintln!("Query execution timed out");
}
Err(QueryError::ExecutionError(ExecutionError::MemoryLimit)) => {
eprintln!("Query exceeded memory limit");
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
}
}
oxirs-core: RDF data model and storageoxirs-fuseki: SPARQL HTTP serveroxirs-rule: Rule-based reasoningoxirs-shacl: Shape validationcd engine/oxirs-arq
cargo test
cargo bench
cargo run --example visualize-plan -- query.sparql
Licensed under either of:
at your option.
🚀 Production Release (v0.1.0) - January 7, 2026
Current implementation status:
SERVICE) with retries, SERVICE SILENT, and result mergingAPIs follow semantic versioning. See CHANGELOG.md for details.