| Crates.io | gigadex-events |
| lib.rs | gigadex-events |
| version | 0.1.0 |
| created_at | 2026-01-06 22:58:49.11757+00 |
| updated_at | 2026-01-06 22:58:49.11757+00 |
| description | Low-latency TCP communication protocol with zero-copy serialization for exchange systems |
| homepage | |
| repository | https://github.com/gigadex/gigadex-events |
| max_upload_size | |
| id | 2027120 |
| size | 131,426 |
Low-latency TCP communication protocol with zero-copy serialization for exchange systems.
gigadex-events provides a high-performance messaging layer designed for orderbook exchanges where latency matters. It uses rkyv for zero-copy serialization and tokio for async I/O, achieving sub-20µs round-trip times on localhost.
EventChannel abstraction for bidirectional communicationuse gigadex_events::{EventChannel, Message, Side};
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Server
let listener = TcpListener::bind("127.0.0.1:9000").await?;
tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut server = EventChannel::new(stream);
while let Ok(Some(msg)) = server.recv().await {
server.send(&msg).await.unwrap(); // Echo back
}
});
// Client
let mut client = EventChannel::connect("127.0.0.1:9000").await?;
let order = Message::Order {
id: 1,
price: 50000_00000000, // Price in smallest unit
quantity: 1_00000000, // Quantity in smallest unit
side: Side::Buy,
};
client.send(&order).await?;
let response = client.recv().await?;
Ok(())
}
┌─────────────────────────────────────────────────────────────────┐
│ EventChannel │
│ High-level API with timeouts, ping, and connection management │
├─────────────────────────────────────────────────────────────────┤
│ Connection │
│ TCP stream wrapper with send/recv buffering │
├─────────────────────────────────────────────────────────────────┤
│ Codec │
│ Length-prefixed framing + rkyv zero-copy serialization │
├─────────────────────────────────────────────────────────────────┤
│ Messages │
│ Heartbeat, Order, OrderAck, Trade, CancelOrder, CancelAck │
└─────────────────────────────────────────────────────────────────┘
gigadex-events/
├── src/
│ ├── lib.rs # Crate root, re-exports
│ ├── channel.rs # EventChannel + split_channel
│ ├── protocol/
│ │ ├── mod.rs
│ │ └── messages.rs # Message enum, Side enum
│ ├── transport/
│ │ ├── mod.rs
│ │ ├── codec.rs # Encode/decode + Decoder state machine
│ │ ├── connection.rs # TCP Connection wrapper
│ │ └── error.rs # TransportError
│ └── test_utils.rs # EchoServer, message factories (test only)
├── tests/
│ ├── integration_tests.rs # End-to-end network tests
│ └── protocol_tests.rs # Serialization tests
├── benches/
│ └── latency.rs # Criterion benchmarks
├── Cargo.toml
└── README.md
src/protocol/messages.rs)| Type | Description |
|---|---|
Message::Heartbeat |
Connection health check with nanosecond timestamp |
Message::Order |
New order: id, price, quantity, side |
Message::OrderAck |
Order accepted confirmation |
Message::Trade |
Trade execution notification |
Message::CancelOrder |
Cancel request |
Message::CancelAck |
Cancel confirmation |
Side |
Buy or Sell enum |
src/channel.rs)| Function | Description |
|---|---|
EventChannel::new(stream) |
Create from TcpStream |
EventChannel::connect(addr) |
Connect to remote address |
EventChannel::with_config(stream, config) |
Create with custom timeouts |
channel.send(&msg) |
Send a message |
channel.send_flush(&msg) |
Send and flush (lower latency) |
channel.recv() |
Receive a message (returns Option<Message>) |
channel.ping() |
Round-trip latency measurement (returns nanoseconds) |
split_channel(channel, buffer_size) |
Split into sender/receiver for concurrent use |
src/transport/codec.rs)| Function | Description |
|---|---|
encode(&msg, &mut buffer) |
Serialize message with length prefix |
decode(buffer) |
Deserialize message (allocates aligned buffer) |
decode_with_aligned(buffer, &mut aligned) |
Deserialize with reusable aligned buffer |
frame_size(buffer) |
Get total frame size if complete |
Decoder::new() |
Incremental decoder state machine |
| Constant | Value | Description |
|---|---|---|
LENGTH_PREFIX_SIZE |
4 bytes | Frame header size |
MAX_MESSAGE_SIZE |
16 MB | Maximum payload size |
DEFAULT_SEND_BUFFER_CAPACITY |
4096 | Default send buffer |
DEFAULT_RECV_BUFFER_CAPACITY |
4096 | Default receive buffer |
Messages are framed with a 4-byte little-endian length prefix:
+------------------+--------------------+
| Length (4 bytes) | Payload (N bytes) |
+------------------+--------------------+
The payload is an rkyv-serialized Message enum. rkyv archives data in a format that can be accessed directly without deserialization, enabling true zero-copy reads.
Benchmarks on Apple M1 (localhost loopback):
| Metric | Value |
|---|---|
| Ping-pong RTT | ~19 µs |
| Throughput | ~430,000 msgs/sec |
| Serialization (Order) | ~45 ns |
| Deserialization (Order) | ~11 ns |
Run benchmarks:
cargo bench
Run detailed latency distribution:
cargo test --release --bench latency -- --nocapture measure_latency_distribution
cargo test
cargo clippy --all-targets
cargo build --release
| Crate | Purpose |
|---|---|
tokio |
Async runtime and networking |
rkyv |
Zero-copy serialization |
bytes |
Buffer management |
thiserror |
Error derive macros |
criterion |
Benchmarking (dev) |
MIT