| Crates.io | oxirs-federate |
| lib.rs | oxirs-federate |
| version | 0.1.0 |
| created_at | 2025-09-30 09:46:44.916319+00 |
| updated_at | 2026-01-20 21:50:39.01312+00 |
| description | SERVICE planner and GraphQL stitching for federated SPARQL queries |
| homepage | https://github.com/cool-japan/oxirs |
| repository | https://github.com/cool-japan/oxirs |
| max_upload_size | |
| id | 1860898 |
| size | 3,404,039 |
Status: Production Release (v0.1.0) - Released January 7, 2026
✨ Features Complete! All Release Targets implemented. APIs stable. Ready for promotion.
Federated SPARQL query processing across multiple RDF endpoints. Execute queries spanning distributed knowledge graphs with intelligent optimization and result integration.
Add to your Cargo.toml:
# Features complete - APIs stable
[dependencies]
oxirs-federate = "0.1.0"
use oxirs_federate::{FederatedEngine, Endpoint};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure endpoints
let endpoints = vec![
Endpoint::new("DBpedia", "https://dbpedia.org/sparql"),
Endpoint::new("Wikidata", "https://query.wikidata.org/sparql"),
];
// Create federated engine
let engine = FederatedEngine::new(endpoints)?;
// Execute federated query
let query = r#"
PREFIX dbo: <http://dbpedia.org/ontology/>
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
SELECT ?person ?dbpName ?wikidataId WHERE {
# From DBpedia
SERVICE <https://dbpedia.org/sparql> {
?person a dbo:Person .
?person dbo:name ?dbpName .
}
# From Wikidata
SERVICE <https://query.wikidata.org/sparql> {
?wikidataId wdt:P31 wd:Q5 .
?wikidataId rdfs:label ?dbpName .
}
}
LIMIT 10
"#;
let results = engine.execute(query).await?;
for result in results {
println!("{:?}", result);
}
Ok(())
}
Let the engine automatically determine which endpoints to query:
use oxirs_federate::{FederatedEngine, EndpointRegistry};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Discover available endpoints
let registry = EndpointRegistry::discover().await?;
let engine = FederatedEngine::builder()
.registry(registry)
.enable_auto_discovery(true)
.build()?;
// Query without explicit SERVICE clauses
let query = r#"
SELECT ?person ?name WHERE {
?person a foaf:Person .
?person foaf:name ?name .
}
LIMIT 100
"#;
// Engine automatically selects relevant endpoints
let results = engine.execute(query).await?;
Ok(())
}
use oxirs_federate::Endpoint;
let endpoint = Endpoint::builder()
.name("DBpedia")
.url("https://dbpedia.org/sparql")
.timeout(Duration::from_secs(30))
.build()?;
use oxirs_federate::{Endpoint, Authentication};
let endpoint = Endpoint::builder()
.name("Private SPARQL")
.url("https://private.example.org/sparql")
.authentication(Authentication::Basic {
username: "user".to_string(),
password: "pass".to_string(),
})
.build()?;
use oxirs_federate::{Endpoint, EndpointCapabilities};
let capabilities = EndpointCapabilities {
supports_aggregation: true,
supports_property_paths: true,
supports_update: false,
max_results: Some(10000),
estimated_triples: 1_000_000_000,
};
let endpoint = Endpoint::builder()
.name("DBpedia")
.url("https://dbpedia.org/sparql")
.capabilities(capabilities)
.build()?;
use oxirs_federate::{FederatedEngine, SourceSelector};
let selector = SourceSelector::builder()
.strategy(SelectionStrategy::CostBased)
.prefer_local(true)
.max_endpoints_per_query(5)
.build();
let engine = FederatedEngine::builder()
.source_selector(selector)
.build()?;
use oxirs_federate::QueryDecomposer;
let decomposer = QueryDecomposer::new();
// Automatically decompose query
let subqueries = decomposer.decompose(query)?;
for (endpoint, subquery) in subqueries {
println!("Send to {}: {}", endpoint, subquery);
}
use oxirs_federate::JoinOptimizer;
let optimizer = JoinOptimizer::builder()
.strategy(JoinStrategy::BindJoin) // or HashJoin, NestedLoop
.max_bind_size(1000)
.enable_selectivity_estimation(true)
.build();
use oxirs_federate::{FederatedEngine, CacheConfig};
let cache_config = CacheConfig {
enabled: true,
ttl: Duration::from_secs(3600),
max_size: 1000,
cache_dir: Some("./federation_cache".into()),
};
let engine = FederatedEngine::builder()
.cache_config(cache_config)
.build()?;
let engine = FederatedEngine::builder()
.max_parallel_requests(10)
.connection_pool_size(20)
.build()?;
// Executes sub-queries in parallel
let results = engine.execute_parallel(query).await?;
use oxirs_federate::{FederatedEngine, FailurePolicy};
let policy = FailurePolicy {
retry_attempts: 3,
retry_delay: Duration::from_secs(1),
retry_backoff: 2.0, // Exponential backoff
continue_on_endpoint_failure: true,
partial_results: true,
};
let engine = FederatedEngine::builder()
.failure_policy(policy)
.build()?;
let results = engine.execute_with_stats(query).await?;
println!("Endpoints queried: {}", results.stats.endpoints_used);
println!("Total execution time: {:?}", results.stats.total_time);
println!("Data transferred: {} bytes", results.stats.bytes_transferred);
println!("Results returned: {}", results.data.len());
for endpoint_stat in results.stats.endpoint_stats {
println!("{}: {:?}", endpoint_stat.name, endpoint_stat.duration);
}
use oxirs_federate::HealthMonitor;
let monitor = HealthMonitor::new(&engine);
// Check endpoint health
let health = monitor.check_health().await?;
for (endpoint, status) in health {
match status {
EndpointStatus::Healthy => println!("{}: OK", endpoint),
EndpointStatus::Degraded => println!("{}: Degraded", endpoint),
EndpointStatus::Unavailable => println!("{}: Down", endpoint),
}
}
use oxirs_arq::QueryEngine;
use oxirs_federate::FederatedExtension;
// Extend query engine with federation
let mut engine = QueryEngine::new();
engine.add_extension(FederatedExtension::new(endpoints));
// Use SERVICE clauses in queries
let results = engine.execute(federated_query).await?;
use oxirs_gql::GraphQLServer;
use oxirs_federate::FederatedEngine;
let graphql_server = GraphQLServer::builder()
.federated_engine(federated_engine)
.enable_federation(true)
.build()?;
// GraphQL queries can span multiple RDF sources
| Endpoints | Query Complexity | Execution Time | Data Transfer |
|---|---|---|---|
| 2 | Simple | 150ms | 50KB |
| 5 | Moderate | 800ms | 500KB |
| 10 | Complex | 2.5s | 2MB |
With caching and parallel execution enabled
// Use bind joins for large intermediate results
let engine = FederatedEngine::builder()
.join_strategy(JoinStrategy::BindJoin)
.max_bind_size(1000)
.build()?;
// Enable aggressive caching
let cache_config = CacheConfig {
enabled: true,
ttl: Duration::from_secs(7200),
max_size: 10000,
..Default::default()
};
// Limit data transfer
let engine = FederatedEngine::builder()
.max_result_size(100_000)
.compression(true)
.build()?;
use oxirs_federate::ServiceDiscovery;
let discovery = ServiceDiscovery::new();
// Discover SPARQL endpoints
let endpoints = discovery.discover().await?;
for endpoint in endpoints {
println!("Found: {} at {}", endpoint.name, endpoint.url);
println!(" Description: {}", endpoint.description);
println!(" Capabilities: {:?}", endpoint.capabilities);
}
use oxirs_federate::VoidParser;
// Parse VoID (Vocabulary of Interlinked Datasets) descriptions
let void_url = "https://dbpedia.org/void";
let description = VoidParser::parse(void_url).await?;
println!("Dataset: {}", description.title);
println!("Triples: {}", description.triples);
println!("SPARQL endpoint: {}", description.sparql_endpoint);
This is an experimental module. Feedback welcome!
MIT OR Apache-2.0