oxirs-federate

Crates.iooxirs-federate
lib.rsoxirs-federate
version0.1.0
created_at2025-09-30 09:46:44.916319+00
updated_at2026-01-20 21:50:39.01312+00
descriptionSERVICE planner and GraphQL stitching for federated SPARQL queries
homepagehttps://github.com/cool-japan/oxirs
repositoryhttps://github.com/cool-japan/oxirs
max_upload_size
id1860898
size3,404,039
KitaSan (cool-japan)

documentation

README

OxiRS Federate - Federated Query Processing

Version

Status: Production Release (v0.1.0) - Released January 7, 2026

Features Complete! All Release Targets implemented. APIs stable. Ready for promotion.

Federated SPARQL query processing across multiple RDF endpoints. Execute queries spanning distributed knowledge graphs with intelligent optimization and result integration.

Features

Federated Query Execution

  • Multi-endpoint Queries - Query across multiple SPARQL endpoints
  • Intelligent Source Selection - Automatically choose relevant endpoints
  • Query Decomposition - Split queries for parallel execution
  • Result Integration - Efficiently merge results from multiple sources

Optimization

  • Cost-based Planning - Optimize query execution plans
  • Join Ordering - Minimize data transfer between endpoints
  • Parallel Execution - Execute independent sub-queries concurrently
  • Result Caching - Cache frequent sub-query results

Reliability

  • Failure Handling - Graceful degradation when endpoints fail
  • Retry Logic - Automatic retry with backoff
  • Timeout Management - Configure timeouts per endpoint
  • Health Monitoring - Track endpoint availability

Installation

Add to your Cargo.toml:

# Features complete - APIs stable
[dependencies]
oxirs-federate = "0.1.0"

Quick Start

Basic Federated Query

use oxirs_federate::{FederatedEngine, Endpoint};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure endpoints
    let endpoints = vec![
        Endpoint::new("DBpedia", "https://dbpedia.org/sparql"),
        Endpoint::new("Wikidata", "https://query.wikidata.org/sparql"),
    ];

    // Create federated engine
    let engine = FederatedEngine::new(endpoints)?;

    // Execute federated query
    let query = r#"
        PREFIX dbo: <http://dbpedia.org/ontology/>
        PREFIX wd: <http://www.wikidata.org/entity/>
        PREFIX wdt: <http://www.wikidata.org/prop/direct/>

        SELECT ?person ?dbpName ?wikidataId WHERE {
            # From DBpedia
            SERVICE <https://dbpedia.org/sparql> {
                ?person a dbo:Person .
                ?person dbo:name ?dbpName .
            }

            # From Wikidata
            SERVICE <https://query.wikidata.org/sparql> {
                ?wikidataId wdt:P31 wd:Q5 .
                ?wikidataId rdfs:label ?dbpName .
            }
        }
        LIMIT 10
    "#;

    let results = engine.execute(query).await?;

    for result in results {
        println!("{:?}", result);
    }

    Ok(())
}

Automatic Federation

Let the engine automatically determine which endpoints to query:

use oxirs_federate::{FederatedEngine, EndpointRegistry};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Discover available endpoints
    let registry = EndpointRegistry::discover().await?;

    let engine = FederatedEngine::builder()
        .registry(registry)
        .enable_auto_discovery(true)
        .build()?;

    // Query without explicit SERVICE clauses
    let query = r#"
        SELECT ?person ?name WHERE {
            ?person a foaf:Person .
            ?person foaf:name ?name .
        }
        LIMIT 100
    "#;

    // Engine automatically selects relevant endpoints
    let results = engine.execute(query).await?;

    Ok(())
}

Endpoint Configuration

Basic Endpoint

use oxirs_federate::Endpoint;

let endpoint = Endpoint::builder()
    .name("DBpedia")
    .url("https://dbpedia.org/sparql")
    .timeout(Duration::from_secs(30))
    .build()?;

Authenticated Endpoint

use oxirs_federate::{Endpoint, Authentication};

let endpoint = Endpoint::builder()
    .name("Private SPARQL")
    .url("https://private.example.org/sparql")
    .authentication(Authentication::Basic {
        username: "user".to_string(),
        password: "pass".to_string(),
    })
    .build()?;

Endpoint with Capabilities

use oxirs_federate::{Endpoint, EndpointCapabilities};

let capabilities = EndpointCapabilities {
    supports_aggregation: true,
    supports_property_paths: true,
    supports_update: false,
    max_results: Some(10000),
    estimated_triples: 1_000_000_000,
};

let endpoint = Endpoint::builder()
    .name("DBpedia")
    .url("https://dbpedia.org/sparql")
    .capabilities(capabilities)
    .build()?;

Query Optimization

Source Selection

use oxirs_federate::{FederatedEngine, SourceSelector};

let selector = SourceSelector::builder()
    .strategy(SelectionStrategy::CostBased)
    .prefer_local(true)
    .max_endpoints_per_query(5)
    .build();

let engine = FederatedEngine::builder()
    .source_selector(selector)
    .build()?;

Query Decomposition

use oxirs_federate::QueryDecomposer;

let decomposer = QueryDecomposer::new();

// Automatically decompose query
let subqueries = decomposer.decompose(query)?;

for (endpoint, subquery) in subqueries {
    println!("Send to {}: {}", endpoint, subquery);
}

Join Optimization

use oxirs_federate::JoinOptimizer;

let optimizer = JoinOptimizer::builder()
    .strategy(JoinStrategy::BindJoin)  // or HashJoin, NestedLoop
    .max_bind_size(1000)
    .enable_selectivity_estimation(true)
    .build();

Advanced Features

Result Caching

use oxirs_federate::{FederatedEngine, CacheConfig};

let cache_config = CacheConfig {
    enabled: true,
    ttl: Duration::from_secs(3600),
    max_size: 1000,
    cache_dir: Some("./federation_cache".into()),
};

let engine = FederatedEngine::builder()
    .cache_config(cache_config)
    .build()?;

Parallel Execution

let engine = FederatedEngine::builder()
    .max_parallel_requests(10)
    .connection_pool_size(20)
    .build()?;

// Executes sub-queries in parallel
let results = engine.execute_parallel(query).await?;

Failure Handling

use oxirs_federate::{FederatedEngine, FailurePolicy};

let policy = FailurePolicy {
    retry_attempts: 3,
    retry_delay: Duration::from_secs(1),
    retry_backoff: 2.0,  // Exponential backoff
    continue_on_endpoint_failure: true,
    partial_results: true,
};

let engine = FederatedEngine::builder()
    .failure_policy(policy)
    .build()?;

Monitoring

Query Statistics

let results = engine.execute_with_stats(query).await?;

println!("Endpoints queried: {}", results.stats.endpoints_used);
println!("Total execution time: {:?}", results.stats.total_time);
println!("Data transferred: {} bytes", results.stats.bytes_transferred);
println!("Results returned: {}", results.data.len());

for endpoint_stat in results.stats.endpoint_stats {
    println!("{}: {:?}", endpoint_stat.name, endpoint_stat.duration);
}

Health Monitoring

use oxirs_federate::HealthMonitor;

let monitor = HealthMonitor::new(&engine);

// Check endpoint health
let health = monitor.check_health().await?;

for (endpoint, status) in health {
    match status {
        EndpointStatus::Healthy => println!("{}: OK", endpoint),
        EndpointStatus::Degraded => println!("{}: Degraded", endpoint),
        EndpointStatus::Unavailable => println!("{}: Down", endpoint),
    }
}

Integration

With oxirs-arq

use oxirs_arq::QueryEngine;
use oxirs_federate::FederatedExtension;

// Extend query engine with federation
let mut engine = QueryEngine::new();
engine.add_extension(FederatedExtension::new(endpoints));

// Use SERVICE clauses in queries
let results = engine.execute(federated_query).await?;

With oxirs-gql (GraphQL Federation)

use oxirs_gql::GraphQLServer;
use oxirs_federate::FederatedEngine;

let graphql_server = GraphQLServer::builder()
    .federated_engine(federated_engine)
    .enable_federation(true)
    .build()?;

// GraphQL queries can span multiple RDF sources

Performance

Benchmarks

Endpoints Query Complexity Execution Time Data Transfer
2 Simple 150ms 50KB
5 Moderate 800ms 500KB
10 Complex 2.5s 2MB

With caching and parallel execution enabled

Optimization Tips

// Use bind joins for large intermediate results
let engine = FederatedEngine::builder()
    .join_strategy(JoinStrategy::BindJoin)
    .max_bind_size(1000)
    .build()?;

// Enable aggressive caching
let cache_config = CacheConfig {
    enabled: true,
    ttl: Duration::from_secs(7200),
    max_size: 10000,
    ..Default::default()
};

// Limit data transfer
let engine = FederatedEngine::builder()
    .max_result_size(100_000)
    .compression(true)
    .build()?;

Service Discovery

Automatic Discovery

use oxirs_federate::ServiceDiscovery;

let discovery = ServiceDiscovery::new();

// Discover SPARQL endpoints
let endpoints = discovery.discover().await?;

for endpoint in endpoints {
    println!("Found: {} at {}", endpoint.name, endpoint.url);
    println!("  Description: {}", endpoint.description);
    println!("  Capabilities: {:?}", endpoint.capabilities);
}

VoID Descriptions

use oxirs_federate::VoidParser;

// Parse VoID (Vocabulary of Interlinked Datasets) descriptions
let void_url = "https://dbpedia.org/void";
let description = VoidParser::parse(void_url).await?;

println!("Dataset: {}", description.title);
println!("Triples: {}", description.triples);
println!("SPARQL endpoint: {}", description.sparql_endpoint);

Status

Production Release (v0.1.0) - Features Complete!

  • Distributed Transactions - 2PC and Saga patterns with automatic compensation
  • Advanced Authentication - OAuth2, SAML, JWT, API keys, Basic, Service-to-Service
  • ML-Driven Optimization - Intelligent source selection and query planning
  • Adaptive Join Strategies - Bind join, hash join, nested loop with cost-based selection
  • GraphQL Federation - Schema stitching, entity resolution, query translation
  • Production Monitoring - OpenTelemetry, circuit breakers, auto-healing
  • Streaming Support - Real-time processing with NATS/Kafka and backpressure handling
  • Load Balancing - Adaptive algorithms with health-aware routing
  • 285 Passing Tests - Comprehensive test coverage with zero warnings

Contributing

This is an experimental module. Feedback welcome!

License

MIT OR Apache-2.0

See Also

Commit count: 1

cargo fmt