| Crates.io | rsp-rs |
| lib.rs | rsp-rs |
| version | 0.3.5 |
| created_at | 2025-11-04 14:59:42.226125+00 |
| updated_at | 2025-11-28 17:23:46.591557+00 |
| description | A RDF Stream Processing engine in Rust, supporting RSP-QL queries with sliding windows and real-time analytics |
| homepage | https://github.com/argahsuknesib/rsp-rs |
| repository | https://github.com/argahsuknesib/rsp-rs |
| max_upload_size | |
| id | 1916362 |
| size | 301,824 |
A high-performance RDF Stream Processing engine in Rust built on Oxigraph.
[dependencies]
rsp-rs = "0.3.5"
Or:
cargo add rsp-rs
use rsp_rs::RSPEngine;
use oxigraph::model::*;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define RSP-QL query
let query = r#"
PREFIX ex: <https://rsp.rs/>
REGISTER RStream <output> AS
SELECT *
FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 10000 STEP 2000]
WHERE {
WINDOW ex:w1 { ?s ?p ?o }
}
"#;
// Initialize engine
let mut engine = RSPEngine::new(query.to_string());
engine.initialize()?;
// Get stream and start processing
let stream = engine.get_stream("https://rsp.rs/stream1").unwrap();
let results = engine.start_processing();
// Add data with timestamps
let quad = Quad::new(
NamedNode::new("https://rsp.rs/subject")?,
NamedNode::new("https://rsp.rs/predicate")?,
NamedNode::new("https://rsp.rs/object")?,
GraphName::DefaultGraph,
);
stream.add_quads(vec![quad], 1000)?;
// Close stream to get final results
engine.close_stream("https://rsp.rs/stream1", 10000)?;
Ok(())
}
Results emit when windows close, triggered by event timestamps (not wall-clock time):
stream.add_quads(vec![quad1], 0)?; // Added to window
stream.add_quads(vec![quad2], 1000)?; // Added to window
stream.add_quads(vec![quad3], 2000)?; // Closes window - results emitted!
Important: Always call close_stream() after your last event to trigger final window closures.
The system is timestamp-driven:
timestamp parameter mattersrsp-rs v0.3.5 fixes a critical precision issue with large timestamps (e.g., Unix milliseconds).
i64::MAXSee LARGE_TIMESTAMP_FIX.md for details and migration guide.
new(query) - Create engine with RSP-QL queryinitialize() - Initialize windows and streamsstart_processing() - Start processing, returns result receiverget_stream(name) - Get stream for adding dataclose_stream(uri, timestamp) - Trigger final window closuresadd_static_data(quad) - Add static background dataadd_quads(quads, timestamp) - Add quads with event timestamplet window = engine.get_window("window_name").unwrap();
let mut w = window.lock().unwrap();
println!("Active windows: {}", w.get_active_window_count());
w.set_debug_mode(true); // Enable verbose logging
Run benchmarks:
cargo bench
See examples/streaming_lifecycle.rs and tests/integration/ for more examples.
MIT License - Copyright Ghent University - imec
Rust port of RSP-JS. Thanks to the original authors for their excellent work.
Kush Bisen or create an issue on GitHub.