theta-sync

Crates.iotheta-sync
lib.rstheta-sync
version0.1.0-alpha.1
created_at2025-08-03 04:38:06.240165+00
updated_at2025-08-04 13:24:02.282079+00
descriptionA high-performance no_std MPSC channel with full Tokio compatibility
homepage
repositoryhttps://github.com/cwahn/theta-sync
max_upload_size
id1779354
size140,828
lighthouse (cwahn)

documentation

README

theta-sync

Crates.io Documentation License Build Status

A high-performance, no_std compatible MPSC channel with full Tokio API compatibility.

Overview

theta-sync provides a multi-producer, single-consumer (MPSC) channel that serves as a drop-in replacement for tokio::sync::mpsc::unbounded_channel. It offers improved performance through lock-free atomic operations while maintaining complete API compatibility with Tokio.

Features

  • Tokio Compatible: Complete API compatibility with tokio::sync::mpsc::unbounded_channel
  • no_std Support: Works in embedded environments (requires alloc)
  • High Performance: Optimized lock-free implementation with superior throughput
  • Memory Safe: Built with Rust's safety guarantees and comprehensive testing
  • FIFO Ordering: Guaranteed message ordering even under high concurrency
  • Weak References: Support for non-owning sender references
  • Rich API: Channel introspection and state management

Installation

[dependencies]
theta-sync = "0.1"

For no_std environments:

[dependencies]
theta-sync = { version = "0.1", default-features = false }

Quick Start

use theta_sync::unbounded_channel;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = unbounded_channel();
    
    // Send messages
    tx.send("Hello").unwrap();
    tx.send("World").unwrap();
    
    // Receive messages
    while let Some(msg) = rx.recv().await {
        println!("Received: {}", msg);
    }
}

Synchronous Usage

use theta_sync::unbounded_channel;

fn main() {
    let (tx, mut rx) = unbounded_channel();
    
    tx.send(42).unwrap();
    tx.send(84).unwrap();
    
    assert_eq!(rx.try_recv().unwrap(), 42);
    assert_eq!(rx.try_recv().unwrap(), 84);
}

API Compatibility

theta-sync is a drop-in replacement for Tokio's unbounded MPSC channel:

// Simply change the import
// use tokio::sync::mpsc::unbounded_channel;  // Before
use theta_sync::unbounded_channel;            // After

let (tx, mut rx) = unbounded_channel();

// All Tokio APIs work identically
let weak_tx = tx.downgrade();         // Weak references  
let count = tx.strong_count();        // Reference counting
let is_closed = tx.is_closed();       // State checking  
let len = rx.len();                   // Queue length
rx.close();                           // Manual close
tx.closed().await;                    // Close notification

Architecture

theta-sync uses a lock-free block-based design:

  • Block Storage: Messages are stored in fixed-size blocks with atomic operations
  • FIFO Ordering: Guaranteed through atomic slot allocation
  • Memory Management: Automatic cleanup as the receiver advances
  • Async Support: Efficient waker system for async/await compatibility

Advanced Usage

Weak Senders

let (tx, _rx) = unbounded_channel();
let weak_tx = tx.downgrade();

drop(tx); // Channel stays open

if let Some(strong_tx) = weak_tx.upgrade() {
    strong_tx.send("message").unwrap();
}

Error Handling

use theta_sync::{unbounded_channel, SendError, TryRecvError};

let (tx, mut rx) = unbounded_channel();

match tx.send(42) {
    Ok(()) => println!("Sent successfully"),
    Err(SendError(value)) => println!("Channel closed: {}", value),
}

match rx.try_recv() {
    Ok(value) => println!("Received: {}", value),
    Err(TryRecvError::Empty) => println!("Channel is empty"),
    Err(TryRecvError::Disconnected) => println!("Channel is closed"),
}

Performance

Benchmarks show theta-sync delivers superior performance compared to Tokio's MPSC channel, with up to 2x better throughput in typical usage scenarios.

Testing

Run the comprehensive test suite:

cargo test    # Run all tests
cargo bench   # Run benchmarks

Use Cases

  • Embedded Systems: no_std compatibility for resource-constrained environments
  • High-Throughput Services: Improved performance for message-heavy applications
  • Async Runtimes: Drop-in Tokio replacement with better performance
  • Multi-Producer Pipelines: Efficient concurrent message passing

Contributing

Contributions are welcome! Please ensure:

  • All tests pass: cargo test
  • Code is formatted: cargo fmt
  • No clippy warnings: cargo clippy

License

Licensed under either of Apache-2.0 or MIT at your option.

Acknowledgments

Inspired by Tokio's MPSC channel design and optimized for modern Rust async ecosystems.

🎯 Use Cases

Embedded Systems

#![no_std]
extern crate alloc;

use theta_sync::unbounded_channel;
use alloc::vec::Vec;

// Perfect for embedded async runtimes
fn sensor_data_processor() {
    let (tx, mut rx) = unbounded_channel();
    
    // Send sensor readings
    tx.send(SensorReading { temp: 25.5, humidity: 60.0 }).unwrap();
    
    // Process asynchronously  
    while let Ok(reading) = rx.try_recv() {
        process_sensor_data(reading);
    }
}

🔧 Configuration

theta-sync automatically configures optimal block sizes:

  • 64-bit targets: 32 messages per block
  • 32-bit targets: 16 messages per block
  • Memory usage: ~1KB per block on 64-bit systems

🛡️ Safety

theta-sync provides complete memory safety:

  • No unsafe data races: All shared state protected by atomics
  • Proper drop semantics: Messages properly dropped on channel close
  • Unwind safety: Panic-safe in all scenarios
  • Memory leak free: Automatic block cleanup
  • ABA problem resistant: Pointer-based linked list design

📊 Benchmarks

Run benchmarks yourself:

cargo bench

Benchmark categories:

  • Throughput: Single-threaded send/recv performance
  • Concurrency: Multi-sender scenarios
  • Block Boundaries: Memory allocation patterns
  • Large Payloads: Performance with bigger messages
  • Weak Senders: Reference management overhead

🤝 Contributing

Contributions welcome! Please read our contributing guidelines and ensure:

  • All tests pass: cargo test
  • Benchmarks run clean: cargo bench
  • Code is formatted: cargo fmt
  • No clippy warnings: cargo clippy

📄 License

Licensed under either of:

at your option.

🙏 Acknowledgments

  • Inspired by Tokio's MPSC channel design
Commit count: 0

cargo fmt