| Crates.io | oxirs-stream |
| lib.rs | oxirs-stream |
| version | 0.1.0 |
| created_at | 2025-09-30 08:54:49.259574+00 |
| updated_at | 2026-01-20 21:35:07.710695+00 |
| description | Real-time streaming support with Kafka/NATS/MQTT/OPC-UA I/O, RDF Patch, and SPARQL Update delta |
| homepage | https://github.com/cool-japan/oxirs |
| repository | https://github.com/cool-japan/oxirs |
| max_upload_size | |
| id | 1860833 |
| size | 4,376,413 |
Status: Production Release (v0.1.0) - Released January 7, 2026
✨ Production Release: Production-ready with API stability guarantees and comprehensive testing.
Real-time RDF data streaming with support for Kafka, NATS, and other message brokers. Process RDF streams with windowing, aggregation, and pattern matching.
Add to your Cargo.toml:
# Experimental feature
[dependencies]
oxirs-stream = "0.1.0"
# Enable specific brokers
oxirs-stream = { version = "0.1.0", features = ["kafka", "nats"] }
use oxirs_stream::{StreamSource, KafkaConfig};
use oxirs_core::Triple;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Kafka source
let config = KafkaConfig {
bootstrap_servers: vec!["localhost:9092".to_string()],
topic: "rdf-triples".to_string(),
group_id: "oxirs-consumer".to_string(),
..Default::default()
};
// Create stream
let mut stream = StreamSource::kafka(config).await?;
// Process triples
while let Some(triple) = stream.next().await {
let triple = triple?;
println!("{} {} {}", triple.subject, triple.predicate, triple.object);
// Process triple...
}
Ok(())
}
use oxirs_stream::{StreamProcessor, WindowConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let processor = StreamProcessor::builder()
.source(kafka_source)
.window(WindowConfig::tumbling(Duration::from_secs(60)))
.build()?;
// Process windowed batches
let mut windows = processor.process().await?;
while let Some(window) = windows.next().await {
let triples = window?;
println!("Window received {} triples", triples.len());
// Aggregate, validate, or process batch
process_window(triples)?;
}
Ok(())
}
use oxirs_stream::KafkaConfig;
let config = KafkaConfig {
bootstrap_servers: vec!["kafka1:9092".to_string(), "kafka2:9092".to_string()],
topic: "rdf-events".to_string(),
group_id: "my-consumer-group".to_string(),
// Performance tuning
fetch_min_bytes: 1024,
fetch_max_wait_ms: 500,
max_partition_fetch_bytes: 1048576,
// Reliability
enable_auto_commit: false,
auto_commit_interval_ms: 5000,
// Security
security_protocol: Some("SASL_SSL".to_string()),
sasl_mechanism: Some("PLAIN".to_string()),
sasl_username: Some(std::env::var("KAFKA_USERNAME")?),
sasl_password: Some(std::env::var("KAFKA_PASSWORD")?),
};
use oxirs_stream::NatsConfig;
let config = NatsConfig {
servers: vec!["nats://localhost:4222".to_string()],
subject: "rdf.>".to_string(), // Wildcard subscription
queue_group: Some("oxirs-processors".to_string()),
// Credentials
credentials_path: Some("./nats.creds".into()),
// JetStream (persistent)
use_jetstream: true,
stream_name: Some("RDF_STREAM".to_string()),
durable_name: Some("oxirs-consumer".to_string()),
};
Fixed-size, non-overlapping windows:
use oxirs_stream::{WindowConfig, WindowType};
use std::time::Duration;
let config = WindowConfig {
window_type: WindowType::Tumbling,
size: Duration::from_secs(60),
..Default::default()
};
// Process 60-second windows
Overlapping windows:
let config = WindowConfig {
window_type: WindowType::Sliding,
size: Duration::from_secs(60),
slide: Duration::from_secs(30), // 30-second slide
..Default::default()
};
// Windows: [0-60s], [30-90s], [60-120s], ...
Dynamic windows based on inactivity gaps:
let config = WindowConfig {
window_type: WindowType::Session,
gap: Duration::from_secs(300), // 5-minute inactivity closes window
..Default::default()
};
use oxirs_stream::filters::SparqlFilter;
let filter = SparqlFilter::new(r#"
PREFIX foaf: <http://xmlns.com/foaf/0.1/>
FILTER EXISTS {
?s a foaf:Person .
?s foaf:age ?age .
FILTER (?age >= 18)
}
"#)?;
let filtered_stream = stream.filter(filter);
let transformed_stream = stream.map(|triple| {
// Transform each triple
transform_triple(triple)
});
use oxirs_stream::aggregation::{Count, Sum, Average};
let processor = StreamProcessor::builder()
.source(source)
.window(WindowConfig::tumbling(Duration::from_secs(60)))
.aggregate(Count::new("?person", "foaf:Person"))
.aggregate(Average::new("?age", "foaf:age"))
.build()?;
let results = processor.process().await?;
use oxirs_stream::patterns::TemporalPattern;
let pattern = TemporalPattern::builder()
.event("A", "?person foaf:login ?time")
.followed_by("B", "?person foaf:logout ?time2", Duration::from_secs(3600))
.within(Duration::from_hours(24))
.build()?;
let matches = stream.detect_pattern(pattern).await?;
use oxirs_stream::patterns::GraphPattern;
let pattern = GraphPattern::parse(r#"
{
?person a foaf:Person .
?person foaf:knows ?friend .
?friend foaf:age ?age .
FILTER (?age > 18)
}
"#)?;
let matches = stream.match_pattern(pattern).await?;
use oxirs_stream::checkpoint::CheckpointConfig;
let checkpoint_config = CheckpointConfig {
interval: Duration::from_secs(60),
storage: CheckpointStorage::File("./checkpoints".into()),
max_failures: 3,
};
let processor = StreamProcessor::builder()
.source(source)
.checkpoint(checkpoint_config)
.build()?;
// Automatically recovers from last checkpoint on failure
use oxirs_stream::error_handling::{ErrorPolicy, RetryPolicy};
let error_policy = ErrorPolicy {
retry: RetryPolicy::exponential_backoff(3),
dead_letter_topic: Some("rdf-errors".to_string()),
log_errors: true,
};
let processor = StreamProcessor::builder()
.source(source)
.error_policy(error_policy)
.build()?;
use oxirs_stream::StreamProcessor;
use oxirs_shacl::ValidationEngine;
let validator = ValidationEngine::new(&shapes, config);
let processor = StreamProcessor::builder()
.source(kafka_source)
.window(WindowConfig::tumbling(Duration::from_secs(10)))
.validate_with(validator)
.build()?;
let mut results = processor.process().await?;
while let Some(window_result) = results.next().await {
let (triples, validation_report) = window_result?;
if !validation_report.conforms {
eprintln!("Validation failed: {} violations",
validation_report.violations.len());
}
}
use oxirs_stream::StreamProcessor;
use oxirs_arq::StreamingQueryEngine;
let query_engine = StreamingQueryEngine::new();
let query = r#"
PREFIX foaf: <http://xmlns.com/foaf/0.1/>
SELECT ?person (COUNT(?friend) as ?friendCount)
WHERE {
?person a foaf:Person .
?person foaf:knows ?friend .
}
GROUP BY ?person
HAVING (COUNT(?friend) > 10)
"#;
let processor = StreamProcessor::builder()
.source(source)
.window(WindowConfig::tumbling(Duration::from_secs(60)))
.query(query_engine, query)
.build()?;
| Message Broker | Throughput | Latency (p99) |
|---|---|---|
| Kafka | 100K triples/s | 15ms |
| NATS | 80K triples/s | 8ms |
| RabbitMQ | 50K triples/s | 20ms |
Benchmarked on M1 Mac with local brokers
// Batch processing
let processor = StreamProcessor::builder()
.source(source)
.batch_size(1000) // Process in batches of 1000
.parallelism(4) // 4 parallel workers
.build()?;
// Backpressure control
let processor = StreamProcessor::builder()
.source(source)
.buffer_size(10000)
.backpressure_strategy(BackpressureStrategy::Block)
.build()?;
SERVICE bridging to remote endpointsThis is an experimental module. Feedback welcome!
MIT OR Apache-2.0