| Crates.io | xaeroflux-macros |
| lib.rs | xaeroflux-macros |
| version | 0.8.1-m5 |
| created_at | 2025-06-05 01:42:42.478942+00 |
| updated_at | 2025-06-23 17:56:32.414511+00 |
| description | EXPERIMENTAL: actor layer for xaeroflux |
| homepage | https://blockxaero.io |
| repository | https://github.com/block-xaero/xaeroflux.git |
| max_upload_size | |
| id | 1701008 |
| size | 111,207 |
⚠️ Work in progress – NOT READY FOR PRODUCTION USE ⚠️
Xaeroflux is an Rx-like distributed, decentralized, peer-to-peer event streaming engine with built-in CRDT support for conflict-free collaborative applications. It combines reactive programming patterns with automatic conflict resolution to enable truly decentralized, offline-first applications.
Xaeroflux enables you to build collaborative, decentralized applications where multiple users can edit shared data simultaneously without conflicts. Think Google Docs, but without Google's servers - everything runs peer-to-peer.
map, filter, scanXaeroflux is built around four main concepts that work together:
A Subject is a named event stream that multiple participants can write to and read from:
use xaeroflux_macros::subject;
let chat = subject!("workspace/team-alpha/object/general-chat");
Transform and filter events as they flow through the system in real-time:
let filtered_chat = chat
.filter(|msg| !msg.data().is_empty())
.map(|msg| add_timestamp(msg));
Sophisticated event processing pipelines with parallel streaming and batch loops:
let collaborative_doc = subject
.buffer(
Duration::from_millis(50),
Some(20),
vec![
Operator::Sort(Sort::VectorClock.to_operator()),
Operator::Fold(Fold::ORSet.to_operator()),
Operator::Reduce(Reduce::SetContents.to_operator()),
Operator::TransitionTo(SubjectExecutionMode::Buffer, SubjectExecutionMode::Streaming),
],
Arc::new(is_crdt_operation),
);
Events are automatically routed between streaming and batch processing based on predicates, with efficient backpressure management and signal control.
NEW in v0.7.0-m5: Xaeroflux now features a sophisticated zero-copy memory architecture built on ring buffer pools:
Arc<XaeroEvent> provides safe sharing across threadspub struct XaeroEvent {
pub evt: PooledEventPtr, // Ring buffer pointer (zero-copy)
pub author_id: Option<RingPtr<XaeroID>>, // Stack-allocated author
pub merkle_proof: Option<RingPtr<FixedMerkleProof>>, // Stack-allocated proof
pub vector_clock: Option<RingPtr<FixedVectorClock>>, // Stack-allocated clock
pub latest_ts: u64, // Timestamp
}
impl XaeroEvent {
pub fn data(&self) -> &[u8] { // Zero-copy data access
self.evt.data()
}
pub fn event_type(&self) -> u8 { // Zero-copy type access
self.evt.event_type()
}
}
// Initialize ring buffer pools
XaeroPoolManager::init();
// Create events from pool
let event = XaeroPoolManager::create_xaero_event(
data_slice, // &[u8] - zero-copy data
event_type, // u8 - event type
None, // author_id (optional)
None, // merkle_proof (optional)
None, // vector_clock (optional)
timestamp, // u64 - timestamp
)?;
use xaeroflux_macros::subject;
use xaeroflux_core::{XaeroPoolManager, event::*};
use xaeroid::XaeroID;
use std::sync::Arc;
// Initialize ring buffer pools
XaeroPoolManager::init();
// 1. Create a subject for your data
let likes = subject!("workspace/blog/object/post-123-likes");
// 2. Set up a simple streaming pipeline
let likes_stream = likes
.filter(|event| {
matches!(event.event_type(), CRDT_COUNTER_INCREMENT)
})
.subscribe(|event| {
println!("Someone liked the post!");
event
});
// 3. Publish events using ring buffer pools
let user_id = create_test_xaeroid("user123");
let like_event = XaeroPoolManager::create_xaero_event(
&1i64.to_le_bytes(),
CRDT_COUNTER_INCREMENT,
Some(user_id),
None,
None,
current_timestamp(),
).expect("Pool allocation failed");
likes.data.sink.tx.send(like_event).unwrap();
Create sophisticated pipelines that handle both real-time and batch processing:
use xaeroflux_crdt::{Sort, Fold, Reduce};
use std::time::Duration;
let collaborative_likes = likes
.buffer(
Duration::from_millis(100), // Collect events for 100ms
Some(25), // Or until 25 events
vec![
// Batch processing pipeline
Operator::Sort(Sort::VectorClock.to_operator()),
Operator::Fold(Fold::GCounter.to_operator()),
Operator::Reduce(Reduce::CounterValue.to_operator()),
// Transition back to streaming for real-time updates
Operator::TransitionTo(
SubjectExecutionMode::Buffer,
SubjectExecutionMode::Streaming
),
],
Arc::new(|event| {
// Route CRDT operations to batch processing
matches!(event.event_type(),
CRDT_COUNTER_INCREMENT | CRDT_COUNTER_DECREMENT
)
}),
)
.map(|resolved_event| {
// This runs in streaming mode after batch processing
add_ui_metadata(resolved_event)
})
.subscribe(|final_event| {
// Handle both batch results and streaming events
match final_event.event_type() {
CRDT_COUNTER_STATE => {
// Batch-processed counter state
let data = final_event.data();
if data.len() >= 8 {
let bytes: [u8; 8] = data[0..8].try_into().unwrap();
let total_likes = i64::from_le_bytes(bytes);
update_ui_likes(total_likes);
}
},
_ => {
// Regular streaming events
handle_streaming_event(final_event);
}
}
final_event
});
Every event in Xaeroflux uses the new ring buffer architecture:
pub struct XaeroEvent {
pub evt: PooledEventPtr, // Ring buffer pointer (zero-copy)
pub author_id: Option<RingPtr<XaeroID>>, // Stack-allocated
pub merkle_proof: Option<RingPtr<FixedMerkleProof>>, // Stack-allocated
pub vector_clock: Option<RingPtr<FixedVectorClock>>, // Stack-allocated
pub latest_ts: u64, // Timestamp
}
// Zero-copy access methods
impl XaeroEvent {
pub fn data(&self) -> &[u8]; // Access event data
pub fn event_type(&self) -> u8; // Access event type
pub fn author_id(&self) -> Option<&XaeroID>; // Access author
pub fn merkle_proof(&self) -> Option<&[u8]>; // Access proof
}
Transform events in real-time with zero-copy access:
let processed = subject
.map(|xe| {
// Transform each event (zero-copy)
add_metadata(xe)
})
.filter(|xe| {
// Keep only events matching criteria (zero-copy)
!xe.data().is_empty()
})
.filter_merkle_proofs() // Keep only cryptographically verified events
.scan(scan_window); // Replay historical events
Handle concurrent operations with sophisticated pipeline processing:
let pipeline = subject
.buffer(
duration, // Time window for collecting events
event_count, // Optional count threshold
vec![
Operator::Sort(sort_function), // Order by causality
Operator::Fold(merge_function), // Resolve conflicts
Operator::Reduce(extract_state), // Get final state
Operator::TransitionTo(from, to), // Switch processing modes
],
route_predicate, // Which events go to batch processing
);
Xaeroflux features a sophisticated dual-loop architecture that processes events in parallel with zero-copy semantics:
use xaeroflux_crdt::{Sort, Fold, Reduce};
let doc_subject = subject!("workspace/docs/object/shared-document")
.buffer(
Duration::from_millis(200), // Batch text operations
Some(10), // Process immediately with 10 ops
vec![
// Sort operations by timestamp for proper ordering
Operator::Sort(Sort::VectorClock.to_operator()),
// Merge concurrent edits using CRDT rules
Operator::Fold(Fold::LWWRegister.to_operator()),
// Extract final document state
Operator::Reduce(Reduce::RegisterValue.to_operator()),
// Transition back to streaming for real-time cursor updates
Operator::TransitionTo(
SubjectExecutionMode::Buffer,
SubjectExecutionMode::Streaming
),
],
Arc::new(|event| {
// Route text operations to batch, cursor moves to streaming
matches!(event.event_type(),
DOC_TEXT_INSERT | DOC_TEXT_DELETE | DOC_FORMAT_CHANGE
)
}),
)
.filter(|event| {
// Filter out system events in streaming mode
!matches!(event.event_type(), 200..=255) // System event range
})
.subscribe(|event| {
match event.event_type() {
DOC_COMMIT_STATE => {
// Handle batch-processed document commits
apply_document_changes(event);
},
DOC_CURSOR_MOVE => {
// Handle real-time cursor updates
update_cursor_position(event);
},
_ => {}
}
event
});
// The system automatically handles:
// - Text edits → batch processing → conflict resolution → commit
// - Cursor moves → streaming → immediate UI updates
// - Parallel processing without blocking
// - Zero-copy data access throughout
Advanced flow control with signals:
// Emergency stop - drops all future events
subject.data_signal_pipe.sink.tx.send(Signal::Blackhole).unwrap();
// Graceful shutdown
subject.data_signal_pipe.sink.tx.send(Signal::Kill).unwrap();
// Control-specific signals
subject.control_signal_pipe.sink.tx.send(Signal::ControlBlackhole).unwrap();
CRDTs (Conflict-free Replicated Data Types) automatically resolve conflicts when multiple users edit the same data simultaneously. All CRDT operations use the ring buffer architecture for optimal performance.
| CRDT Type | Use Case | Operations | Example |
|---|---|---|---|
| OR-Set | Collections that can grow/shrink | ADD, REMOVE |
User reactions, tags, participants |
| G-Counter | Counters that only increase | INCREMENT |
View counts, downloads |
| PN-Counter | Counters that can increase/decrease | INCREMENT, DECREMENT |
Likes/dislikes, voting |
| LWW-Register | Single values with last-writer-wins | WRITE |
User status, settings |
// OR-Set operations
pub const CRDT_SET_ADD: u8 = 30;
pub const CRDT_SET_REMOVE: u8 = 31;
pub const CRDT_SET_STATE: u8 = 32;
// Counter operations
pub const CRDT_COUNTER_INCREMENT: u8 = 33;
pub const CRDT_COUNTER_DECREMENT: u8 = 34;
pub const CRDT_COUNTER_STATE: u8 = 35;
// Register operations
pub const CRDT_REGISTER_WRITE: u8 = 43;
pub const CRDT_REGISTER_STATE: u8 = 44;
.buffer()use xaeroflux_macros::subject;
use xaeroflux_crdt::{Sort, Fold, Reduce};
use xaeroflux_core::XaeroPoolManager;
// Initialize ring buffer pools
XaeroPoolManager::init();
// Create leaderboard with mixed processing
let leaderboard = subject!("workspace/game/object/arena-leaderboard")
.buffer(
Duration::from_millis(500), // Batch score updates every 500ms
Some(100), // Or when 100 score changes accumulate
vec![
Operator::Sort(Sort::VectorClock.to_operator()),
Operator::Fold(Fold::PNCounter.to_operator()),
Operator::Reduce(Reduce::CounterValue.to_operator()),
// Transition back to streaming for real-time player actions
Operator::TransitionTo(
SubjectExecutionMode::Buffer,
SubjectExecutionMode::Streaming
),
],
Arc::new(|event| {
// Route score changes to batch, player actions to streaming
matches!(event.event_type(),
SCORE_CHANGE | CRDT_COUNTER_INCREMENT | CRDT_COUNTER_DECREMENT
)
}),
)
.subscribe(|event| {
match event.event_type() {
CRDT_COUNTER_STATE => {
// Batch-processed leaderboard update
let data = event.data();
if data.len() >= 8 {
let bytes: [u8; 8] = data[0..8].try_into().unwrap();
let final_score = i64::from_le_bytes(bytes);
update_leaderboard_display(final_score);
broadcast_leaderboard_update(final_score);
}
},
PLAYER_MOVE => {
// Real-time player movement (streaming)
update_player_position(event);
},
CHAT_MESSAGE => {
// Real-time chat (streaming)
display_chat_message(event);
},
_ => {}
}
event
});
// Usage: Multiple players affect scores simultaneously
fn player_scores(player_id: XaeroID, points: i64) -> Result<(), PoolError> {
let event = XaeroPoolManager::create_xaero_event(
&points.to_le_bytes(),
CRDT_COUNTER_INCREMENT,
Some(player_id),
None,
None,
current_timestamp(),
)?;
leaderboard.data.sink.tx.send(event).unwrap();
Ok(())
}
fn player_moves(player_id: XaeroID, position: (f32, f32)) -> Result<(), PoolError> {
let mut data = Vec::new();
data.extend_from_slice(&position.0.to_le_bytes());
data.extend_from_slice(&position.1.to_le_bytes());
let event = XaeroPoolManager::create_xaero_event(
&data,
PLAYER_MOVE,
Some(player_id),
None,
None,
current_timestamp(),
)?;
leaderboard.data.sink.tx.send(event).unwrap();
Ok(())
}
// Concurrent operations work efficiently:
// - Score changes are batched and resolved via CRDT
// - Player movements are streamed in real-time
// - Chat messages flow through streaming pipeline
// - All using zero-copy ring buffer architecture
Every Subject automatically connects to a sophisticated storage and processing system with ring buffer integration:
NEW: Optimized binary format for ring buffer events:
use xaeroflux_macros::subject;
use xaeroflux_core::XaeroPoolManager;
// Initialize ring buffer pools
XaeroPoolManager::init();
// Create subject with automatic system integration
let my_subject = subject!("workspace/myapp/object/data");
// All storage actors and processing loops start automatically
// Ring buffer pools are shared across all system components
Access historical events using the scan operator with zero-copy reconstruction:
use xaeroflux_core::event::ScanWindow;
let historical = subject
.scan(ScanWindow {
start: yesterday_timestamp,
end: now_timestamp,
})
.subscribe(|historical_event| {
// Events are reconstructed into ring buffers from storage
println!("Replaying: type={}, data_len={}",
historical_event.event_type(),
historical_event.data().len());
historical_event
});
// Graceful shutdown of all processing
subject.data_signal_pipe.sink.tx.send(Signal::Kill).unwrap();
subject.control_signal_pipe.sink.tx.send(Signal::ControlKill).unwrap();
// Emergency stop (blackhole)
subject.data_signal_pipe.sink.tx.send(Signal::Blackhole).unwrap();
# All tests with ring buffer stack size
RUST_MIN_STACK=32000000 cargo test
# CRDT-specific tests
cargo test -p xaeroflux-crdt
# Ring buffer pool tests
cargo test -p xaeroflux-core pool::
# Pipeline processing tests
cargo test test_pipeline_
# Concurrent operation tests
cargo test test_concurrent_
#[test]
fn test_ring_buffer_pipeline() {
XaeroPoolManager::init();
let subject = subject!("test/ring-buffer-processing");
let results = Arc::new(Mutex::new(Vec::new()));
let pipeline = subject
.buffer(
Duration::from_millis(50),
Some(5),
vec![
Operator::Sort(Sort::VectorClock.to_operator()),
Operator::Fold(Fold::ORSet.to_operator()),
Operator::Reduce(Reduce::SetContents.to_operator()),
Operator::TransitionTo(
SubjectExecutionMode::Buffer,
SubjectExecutionMode::Streaming
),
],
Arc::new(|event| {
matches!(event.event_type(), CRDT_SET_ADD)
}),
)
.subscribe({
let results = results.clone();
move |event| {
// Verify zero-copy access
assert!(!event.data().is_empty());
assert!(event.event_type() != 0);
results.lock().unwrap().push(event.clone());
event
}
});
// Create events using ring buffer pools
for i in 0..3 {
let event = XaeroPoolManager::create_xaero_event(
&format!("item{}", i).as_bytes(),
CRDT_SET_ADD,
None,
None,
None,
current_timestamp(),
).expect("Pool allocation failed");
subject.data.sink.tx.send(event).unwrap();
}
std::thread::sleep(Duration::from_millis(100));
// Verify pipeline processed events correctly
let final_results = results.lock().unwrap();
assert!(!final_results.is_empty());
// Verify all events used ring buffer allocation
for event in final_results.iter() {
assert!(event.is_pure_zero_copy());
}
}
We welcome contributions! Here's how to get involved:
git clone https://github.com/yourusername/xaeroflux.gitgit checkout -b feature/amazing-featurecargo buildRUST_MIN_STACK=32000000 cargo testcargo fmt and cargo clippyWhen working with the ring buffer architecture:
XaeroPoolManager::init() in tests.data() and .event_type() methodsgit push origin feature/amazing-featureThis project is licensed under the Mozilla Public License 2.0.
What this means:
See LICENSE for the complete license text.
Ready to build the future of collaborative applications? Start with our Getting Started guide and join the decentralized revolution! 🚀