| Crates.io | rusted-ring |
| lib.rs | rusted-ring |
| version | 0.5.6 |
| created_at | 2025-06-19 01:41:47.149429+00 |
| updated_at | 2025-07-23 15:02:45.157938+00 |
| description | LMAX inspired Lock-free ring buffers with cache-aligned memory pools for high-performance event systems |
| homepage | |
| repository | https://github.com/blockxaero/rusted-ring |
| max_upload_size | |
| id | 1717733 |
| size | 142,970 |
A high-performance, LMAX Disruptor-inspired ring buffer library for Rust, designed for nanosecond-level event processing with proven production-ready performance metrics.
Benchmark Results (Production Validated)
This library implements the classic LMAX Disruptor pattern with static ring buffers for maximum performance:
use rusted_ring::{EventPoolFactory, EventUtils, PooledEvent};
// Get writer for specific pool size
let mut writer = EventPoolFactory::get_xs_writer(); // 64-byte events
// Create and emit events (nanosecond-level allocation)
let event = EventUtils::create_pooled_event::<64>(data, event_type)?;
writer.add(event); // ~1.43µs including FFI overhead
// Multiple independent readers (fan-out pattern)
let mut storage_reader = EventPoolFactory::get_xs_reader();
let mut network_reader = EventPoolFactory::get_xs_reader();
let mut analytics_reader = EventPoolFactory::get_xs_reader();
// Each reader processes independently at their own speed
std::thread::spawn(move || {
while let Some(event) = storage_reader.next() {
// Process for database storage
store_to_database(&event.data[..event.len as usize]);
}
});
std::thread::spawn(move || {
while let Some(event) = network_reader.next() {
// Process for network synchronization
sync_to_peers(&event.data[..event.len as usize]);
}
});
use rusted_ring::{EventPoolFactory, EventUtils};
// Create pipeline stages
let mut input_writer = EventPoolFactory::get_s_writer(); // 256B events in
let mut input_reader = EventPoolFactory::get_s_reader();
let mut output_writer = EventPoolFactory::get_m_writer(); // 1KB events out
// Producer thread (e.g., FFI boundary)
std::thread::spawn(move || {
for raw_data in incoming_stream {
let event = EventUtils::create_pooled_event::<256>(&raw_data, INPUT_TYPE)?;
input_writer.add(event); // 175M events/sec capability
}
});
// Processing pipeline (71M ops/sec per stage)
std::thread::spawn(move || {
while let Some(input_event) = input_reader.next() {
// Transform data (e.g., parse, validate, enrich)
let processed_data = transform_data(&input_event.data);
let output_event = EventUtils::create_pooled_event::<1024>(&processed_data, OUTPUT_TYPE)?;
output_writer.add(output_event);
}
});
Fixed-size, cache-aligned event structure optimized for zero-copy operations:
#[repr(C, align(64))]
#[derive(Debug, Copy, Clone)]
pub struct PooledEvent<const TSHIRT_SIZE: usize> {
pub len: u32,
pub event_type: u32,
pub data: [u8; TSHIRT_SIZE],
}
// Zero-copy conversion example
impl<const SIZE: usize> From<PooledEvent<SIZE>> for MyCustomEvent {
fn from(value: PooledEvent<SIZE>) -> Self {
*bytemuck::from_bytes::<MyCustomEvent>(&value.data[..size_of::<MyCustomEvent>()])
}
}
#[repr(C, align(64))]
pub struct RingBuffer<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> {
pub published_sequence: UnsafeCell<usize>, // Writer cursor
pub data: UnsafeCell<[PooledEvent<TSHIRT_SIZE>; RING_CAPACITY]>, // Event storage
}
// Example ring buffer sizes (total ~1.2MB memory footprint)
static XS_RING: OnceLock<RingBuffer<64, 2000>> = OnceLock::new(); // 128KB + metadata
static S_RING: OnceLock<RingBuffer<256, 1000>> = OnceLock::new(); // 256KB + metadata
static M_RING: OnceLock<RingBuffer<1024, 300>> = OnceLock::new(); // 307KB + metadata
Pre-defined event sizes based on real-world usage patterns:
pub enum EventSize {
XS, // 64 bytes - Heartbeats, cursors, simple state changes, control signals
S, // 256 bytes - Chat messages, CRDT operations, user actions, small data packets
M, // 1KB - Document edits, API payloads, structured data, drawing strokes
L, // 4KB - Images, complex objects, batch operations, file chunks
XL, // 16KB - Large files, documents, multimedia data, complex diagrams
}
// Automatic size selection with performance validation
let size = EventPoolFactory::estimate_size(payload.len());
match size {
EventSize::XS => {
let mut writer = EventPoolFactory::get_xs_writer();
let event = EventUtils::create_pooled_event::<64>(payload, event_type)?;
writer.add(event); // 175M events/sec capability
}
EventSize::S => {
let mut writer = EventPoolFactory::get_s_writer();
let event = EventUtils::create_pooled_event::<256>(payload, event_type)?;
writer.add(event); // Sub-microsecond allocation
}
// ... other sizes
}
let reader = EventPoolFactory::get_xs_reader();
let backpressure = reader.backpressure_ratio(); // 0.0 = no pressure, 1.0 = full
if reader.is_under_pressure() {
// Reader falling behind (80% threshold)
apply_throttling();
}
if reader.should_throttle() {
// Critical backpressure (90% threshold)
emergency_throttling();
}
use rusted_ring::{EventPoolFactory, EventUtils, AutoSizedEvent};
// FFI boundary - high-frequency events from Dart/Flutter
#[no_mangle]
pub extern "C" fn emit_drawing_stroke(
stroke_data: *const u8,
len: usize,
event_type: u32
) -> i32 {
let data = unsafe { std::slice::from_raw_parts(stroke_data, len) };
// Auto-size event (validated: 1.43µs per event including FFI)
match EventUtils::create_auto_sized_event(data, event_type) {
Ok(auto_event) => {
if auto_event.emit_to_ring().is_ok() {
0 // Success
} else {
-1 // Ring buffer full
}
}
Err(_) => -2 // Data too large
}
}
// Multi-actor processing (fan-out pattern)
fn start_processing_actors() {
// Storage actor - persist to database
std::thread::spawn(|| {
let mut reader = EventPoolFactory::get_m_reader();
while let Some(event) = reader.next() {
if event.event_type == DRAWING_STROKE_TYPE {
persist_stroke_to_db(&event.data[..event.len as usize]);
}
}
});
// Network actor - sync to peers (tested: 33M events/sec under backpressure)
std::thread::spawn(|| {
let mut reader = EventPoolFactory::get_m_reader();
while let Some(event) = reader.next() {
if event.event_type == DRAWING_STROKE_TYPE {
sync_stroke_to_peers(&event.data[..event.len as usize]);
// Handle backpressure
if reader.should_throttle() {
std::thread::sleep(Duration::from_millis(1));
}
}
}
});
// Analytics actor - real-time metrics
std::thread::spawn(|| {
let mut reader = EventPoolFactory::get_m_reader();
while let Some(event) = reader.next() {
update_analytics_metrics(&event);
}
});
}
use rusted_ring::{EventPoolFactory, EventUtils};
fn create_processing_pipeline() -> Result<()> {
// Stage 1: Raw input processing
let input_processor = std::thread::spawn(|| {
let mut writer = EventPoolFactory::get_s_writer();
// Process incoming data stream
for raw_data in data_stream {
let event = EventUtils::create_pooled_event::<256>(&raw_data, RAW_DATA_TYPE)?;
writer.add(event); // Sub-microsecond allocation
}
});
// Stage 2: Data transformation (validated: 705ns per stage)
let transformer = std::thread::spawn(|| {
let mut input_reader = EventPoolFactory::get_s_reader();
let mut output_writer = EventPoolFactory::get_m_writer();
while let Some(input_event) = input_reader.next() {
// Transform 256B → 1KB (parse, validate, enrich)
let transformed = transform_data(&input_event.data);
let output_event = EventUtils::create_pooled_event::<1024>(&transformed, TRANSFORMED_TYPE)?;
output_writer.add(output_event);
}
});
// Stage 3: Final processing and output
let output_processor = std::thread::spawn(|| {
let mut reader = EventPoolFactory::get_m_reader();
while let Some(event) = reader.next() {
let final_result = final_processing(&event.data);
emit_result(final_result);
}
});
// Wait for completion
input_processor.join().unwrap();
transformer.join().unwrap();
output_processor.join().unwrap();
Ok(())
}
use rusted_ring::{PoolStats, EventPoolFactory};
// Monitor ring buffer health
fn monitor_system_health() {
let stats = PoolStats::collect_all();
for stat in stats {
println!("{}", stat); // Pool XS: capacity=2000, backpressure=15.3%
match stat.pool_id {
PoolId::XS if stat.current_backpressure > 0.8 => {
warn!("XS pool under pressure: {:.1}%", stat.current_backpressure * 100.0);
apply_throttling_to_xs_producers();
}
PoolId::M if stat.current_backpressure > 0.9 => {
error!("M pool critical: {:.1}%", stat.current_backpressure * 100.0);
emergency_throttling();
}
_ => {} // Normal operation
}
}
}
// Production health check
fn health_check() -> SystemHealth {
let all_stats = PoolStats::collect_all();
let max_backpressure = all_stats.iter()
.map(|s| s.current_backpressure)
.fold(0.0f32, f32::max);
match max_backpressure {
bp if bp < 0.5 => SystemHealth::Excellent,
bp if bp < 0.8 => SystemHealth::Good,
bp if bp < 0.9 => SystemHealth::Warning,
_ => SystemHealth::Critical,
}
}
XS: 64B × 2000 = 128KB // High-frequency events (cursors, heartbeats)
S: 256B × 1000 = 256KB // Regular events (messages, actions)
M: 1KB × 300 = 307KB // Medium events (document edits, API calls)
L: 4KB × 60 = 245KB // Large events (images, files)
XL: 16KB × 15 = 245KB // Extra large events (documents, multimedia)
XS: 64B × 500 = 32KB // Reduced capacity for mobile
S: 256B × 250 = 64KB // Mobile-appropriate sizing
M: 1KB × 100 = 100KB // Limited medium events
L: 4KB × 20 = 80KB // Minimal large events
XL: 16KB × 5 = 80KB // Very limited XL events
XS: 64B × 4000 = 256KB // Double capacity for high load
S: 256B × 2000 = 512KB // Increased regular event capacity
M: 1KB × 600 = 614KB // Enhanced medium event processing
L: 4KB × 120 = 491KB // Increased large event handling
XL: 16KB × 30 = 491KB // Enhanced multimedia processing
Built-in guards prevent stack overflow from oversized ring buffers:
const MAX_STACK_BYTES: usize = 1_048_576; // 1MB stack limit
// Compile-time size validation (enforced at build time)
impl<const TSHIRT_SIZE: usize, const RING_CAPACITY: usize> RingBuffer<TSHIRT_SIZE, RING_CAPACITY> {
const _STACK_GUARD: () = {
let total_size = TSHIRT_SIZE * RING_CAPACITY;
assert!(
total_size <= MAX_STACK_BYTES,
"Ring buffer too large for stack! Reduce RING_CAPACITY or TSHIRT_SIZE"
);
};
}
Carefully designed memory ordering ensures lock-free safety with maximum performance:
Release ordering when publishing events (ensures visibility)Acquire ordering when reading cursors (ensures consistency)| Operation | Traditional Channels | Heap Allocation | rusted-ring |
|---|---|---|---|
| Write Latency | 100-500ns | 100-1000ns | Sub-microsecond |
| Read Latency | 50-200ns | N/A | 700ns |
| Throughput | 1-10M/sec | 0.1-1M/sec | 175M/sec |
| Memory | Heap + overhead | Heap + fragmentation | Static + aligned |
| Cache Efficiency | Poor | Poor | Excellent |
| Backpressure | Complex | N/A | Built-in |
MPL-2.0
Benchmarked and validated for production use in high-performance collaborative applications.