Crates.io | rusteron-rb |
lib.rs | rusteron-rb |
version | 0.1.36 |
source | src |
created_at | 2024-11-04 09:51:01.917805 |
updated_at | 2024-11-12 08:16:02.672194 |
description | Provides ring buffer and broadcast functionalities via aeron c bindings, allowing efficient, low-latency message passing between different threads or processes. This module implements Single Producer, Single Consumer (SPSC) ring buffers, Multi-Producer, Single Consumer (MPSC) ring buffers, and broadcast channels. |
homepage | https://github.com/mimran1980/rusteron |
repository | https://github.com/mimran1980/rusteron |
max_upload_size | |
id | 1434769 |
size | 18,468,786 |
rusteron-rb is a core component of the rusteron project, providing ring buffer and broadcast functionalities to interact with the Aeron messaging system in a Rust environment. It enables Rust developers to leverage Aeron's high-performance, low-latency communication protocols.
The rusteron-rb module acts as a Rust wrapper around the Aeron C ring buffer API. It offers functions for establishing connections, transmitting and receiving messages, and broadcasting data streams, allowing seamless communication between distributed applications. Since it is built on top of Aeron's C bindings, this library operates in an unsafe
context, requiring extra care from developers to ensure correctness.
Note: Since this module leverages Aeron C bindings, it is inherently unsafe and should be used with caution. Incorrect usage can lead to undefined behavior, such as segmentation faults.
AeronBroadcastReceiver
.AeronBroadcastTransmitter
.AeronMpscRb
.AeronSpscRb
.This example demonstrates how to use the AeronSpscRb
to create a simple single producer, single consumer ring buffer.
use rusteron_rb::*;
use std::error::Error;
fn main() -> Result<(), Box<dyn Error>> {
let rb = AeronSpscRb::new_with_capacity(1024 * 1024, 1024)?;
// Producer writes data to the ring buffer
for i in 0..50 {
let idx = rb.try_claim(i + 1, 4);
assert!(idx >= 0);
let slot = rb.buffer_at_mut(idx as usize, 4);
slot[0] = i as u8;
rb.commit(idx)?;
}
// another way to commit the data
for i in 0..50 {
let mut slice = rb.try_claim_slice(i + 1, 4).unwrap();
slice[0] = i as u8;
// optional, if you don't call commit/abort will automatically commit
slice.commit()?;
}
// Consumer reads data from the ring buffer
struct Reader;
impl AeronRingBufferHandlerCallback for Reader {
fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) {
println!("msg_type_id: {}, buffer: {:?}", msg_type_id, buffer);
assert_eq!(buffer[0], (msg_type_id - 1) as u8);
}
}
let handler = AeronRingBufferHandlerWrapper::new(Reader);
for _ in 0..10 {
let read = rb.read_msgs(&handler, 10);
assert_eq!(10, read);
}
Ok(())
}
The following example demonstrates how to use the AeronMpscRb
for a multi-producer, single consumer scenario, enabling multiple producers to write to the same ring buffer while a single consumer reads from it.
use rusteron_rb::*;
use std::error::Error;
fn main() -> Result<(), Box<dyn Error>> {
let rb = AeronMpscRb::new_with_capacity(1024 * 1024, 1024)?;
// Producers write data to the ring buffer
for i in 0..100 {
let idx = rb.try_claim(i + 1, 4);
assert!(idx >= 0);
let slot = rb.buffer_at_mut(idx as usize, 4);
slot[0] = i as u8;
rb.commit(idx)?;
}
// Consumer reads data from the ring buffer
struct Reader;
impl AeronRingBufferHandlerCallback for Reader {
fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) {
println!("msg_type_id: {}, buffer: {:?}", msg_type_id, buffer);
assert_eq!(buffer[0], (msg_type_id - 1) as u8);
}
}
let handler = AeronRingBufferHandlerWrapper::new(Reader);
for _ in 0..10 {
let read = rb.read_msgs(&handler, 10);
assert_eq!(10, read);
}
Ok(())
}
This example demonstrates how to set up a broadcast transmitter and receiver. The transmitter sends messages that are then received by the receiver, illustrating a simple broadcast communication scenario.
use rusteron_rb::*;
use std::error::Error;
fn main() -> Result<(), Box<dyn Error>> {
// Set up broadcast transmitter and receiver
let mut vec = vec![0u8; 1024 * 1024 + AERON_BROADCAST_BUFFER_TRAILER_LENGTH];
let transmitter = AeronBroadcastTransmitter::from_slice(vec.as_mut_slice(), 1024)?;
let receiver = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;
// Transmit messages
for i in 0..100 {
let mut msg = [0u8; 4];
msg[0] = i as u8;
let idx = transmitter.transmit_msg(i + 1, &msg).unwrap();
println!("sent {}", idx);
assert!(idx >= 0);
}
// Receive messages
struct Reader;
impl AeronBroadcastReceiverHandlerCallback for Reader {
fn handle_aeron_broadcast_receiver_handler(&mut self, msg_type_id: i32, buffer: &mut [u8]) {
println!("msg_type_id: {}, buffer: {:?}", msg_type_id, buffer);
assert_eq!(buffer[0], (msg_type_id - 1) as u8);
}
}
let handler = Handler::leak(Reader {});
for _ in 0..100 {
let read = receiver.receive(Some(&handler)).unwrap();
println!("read {}", read);
assert!(read > 0);
}
Ok(())
}
Add the following to your Cargo.toml
file to include rusteron-rb:
dynamic lib
[dependencies]
rusteron-rb = "0.1"
static lib
[dependencies]
rusteron-rb = { version = "0.1", features = ["static"] }
Ensure you have also set up the necessary Aeron C libraries required by rusteron-rb.
Since rusteron-rb relies on Aeron C bindings, it involves unsafe
Rust code. Users must ensure:
Failing to uphold these safety measures can lead to crashes or undefined behavior.
Contributions are welcome! Please feel free to open issues, submit pull requests, or suggest new features. We're particularly interested in:
If you wish to contribute, refer to our contributing guidelines.
This project is dual-licensed under either the MIT License or the Apache License 2.0. You may choose which one to use.
Feel free to reach out with any questions or suggestions via GitHub Issues!