dropslot

Crates.iodropslot
lib.rsdropslot
version0.2.0
created_at2025-07-10 09:05:12.631594+00
updated_at2025-07-10 18:26:44.734867+00
descriptionA high-performance publish-subscribe library with latest-only delivery semantics
homepagehttps://github.com/ViezeVingertjes/dropslot
repositoryhttps://github.com/ViezeVingertjes/dropslot
max_upload_size
id1746060
size189,810
Michael (ViezeVingertjes)

documentation

https://docs.rs/dropslot

README

DropSlot

Crates.io Documentation License

A high-performance publish-subscribe library with latest-only delivery semantics for Rust. Built on top of Tokio with zero-copy operations and optimized for both high throughput and low latency scenarios.

โœจ Key Features

  • Latest-only delivery: Subscribers receive only the most recent message, perfect for real-time applications
  • Zero-copy operations: Optimized for bytes::Bytes and other efficient data types
  • String-keyed topics: Simple and intuitive topic naming system
  • High performance: Optimized data structures, memory layout, and CPU cache utilization
  • Async/sync APIs: Both async and non-blocking synchronous operations
  • Thread-safe: Built with concurrent access in mind using lock-free data structures
  • Memory efficient: Weak references prevent memory leaks with manual cleanup available

๐Ÿš€ Quick Start

Add dropslot to your Cargo.toml:

[dependencies]
dropslot = "0.2"

Basic Usage

use dropslot::prelude::*;
use bytes::Bytes;

#[tokio::main]
async fn main() {
    let bus = Bus::<Bytes>::new();
    
    // Create a topic and subscriber
    let topic = bus.topic("events");
    let mut subscriber = topic.subscribe();
    
    // Publish messages
    topic.publish(Bytes::from("Hello, World!"));
    
    // Receive the latest message
    if let Some(message) = subscriber.wait_for_message().await {
        println!("Received: {:?}", message);
    }
}

Performance Configurations

use dropslot::prelude::*;
use bytes::Bytes;

// High throughput: optimized for many topics (large capacity)
let ht_bus = Bus::<Bytes>::with_capacity(64);

// Low latency: optimized for few topics (small capacity)
let ll_bus = Bus::<Bytes>::with_capacity(8);

// Custom capacity
let custom_bus = Bus::<Bytes>::with_capacity(128);

๐Ÿ“Š Performance

DropSlot is designed for high-performance scenarios and delivers exceptional performance:

Key Performance Metrics

Operation Latency Throughput Notes
Topic Creation ~136 ns ~7.4M ops/sec Ultra-fast topic instantiation
Message Publishing ~467 ns ~2.1M ops/sec Direct publish to topic
Subscriber Creation ~510 ns ~2.0M ops/sec Fast subscriber setup
Message Retrieval ~477 ns ~2.1M ops/sec Non-blocking message access
Topic Lookup ~40 ns ~25M ops/sec Optimized topic resolution
Error Handling ~330 ps ~3.0B ops/sec Near-zero overhead

Scalability Performance

Scenario Performance Details
10 Topics ~3.6 ฮผs Excellent small-scale performance
100 Topics ~43 ฮผs Linear scaling maintained
1000 Topics ~458 ฮผs Consistent performance at scale
High Frequency ~11 ฮผs/batch 1000 message batches
Concurrent (16 threads) ~3.3 ms Excellent multi-threaded performance

Memory & Concurrency

  • Memory cleanup: ~3.3 ฮผs for unused topic cleanup
  • Concurrent publishing: Linear scaling up to 16 threads
  • Zero-copy operations: ~612 ns for bytes::Bytes handling
  • Topic management: ~529 ns for topic removal operations

Architecture Optimizations

  • Lock-free concurrent access using DashMap
  • Optimized hashing with AHash
  • CPU cache-friendly memory layout and prefetching
  • Zero-copy operations for byte data
  • Efficient memory management with Arc and weak references

Benchmarks run on CI environment with optimized builds. Your mileage may vary based on hardware and workload.

๐Ÿ”ง Advanced Usage

Custom Message Types

use dropslot::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Event {
    id: u64,
    data: String,
}

let bus = Bus::<Event>::new();
let topic = bus.topic("events");

let event = Event {
    id: 1,
    data: "Hello".to_string(),
};

topic.publish(event);

Error Handling

use dropslot::prelude::*;

let bus = Bus::<String>::new();
let topic = bus.topic("events");
let mut subscriber = topic.subscribe();

match subscriber.try_get_message() {
    Ok(Some(msg)) => println!("Received: {}", msg),
    Ok(None) => println!("No new message"),
    Err(e) if e.is_empty() => println!("No message available"),
    Err(e) if e.is_disconnected() => println!("Topic disconnected"),
    Err(e) => println!("Error: {}", e),
}

Multiple Subscribers

use dropslot::prelude::*;

let bus = Bus::<String>::new();
let topic = bus.topic("notifications");

// Multiple subscribers to the same topic
let mut email_sub = topic.subscribe();
let mut sms_sub = topic.subscribe();
let mut push_sub = topic.subscribe();

// All subscribers receive the same (latest) message
topic.publish("Important update!".to_string());

Topic Management

use dropslot::prelude::*;

let bus = Bus::<String>::new();

// Check topic count
println!("Active topics: {}", bus.topic_count());

// Get all topic names
let names = bus.topic_names();
println!("Topics: {:?}", names);

// Manually clean up unused topics (no automatic cleanup)
let removed = bus.cleanup_unused_topics();
println!("Removed {} unused topics", removed);

๐ŸŽฏ Use Cases

DropSlot is perfect for:

  • Real-time notifications (email, SMS, push notifications)
  • Live data feeds (stock prices, sensor data, metrics)
  • Event sourcing with latest-state semantics
  • Microservice communication for status updates
  • Game state synchronization
  • IoT device coordination

๐Ÿ”ฉ Architecture

Core Components

  • Bus<T>: Main message broker managing topics
  • Topic<T>: Individual message topics with publishers and subscribers
  • Sub<T>: Subscriber receiving messages from topics
  • BusError: Unified error handling

Design Principles

  • Latest-only semantics: Built on tokio::sync::watch channels
  • Memory safety: Extensive use of Arc and Weak references
  • Performance first: Optimized data structures and algorithms
  • Zero-copy where possible: Efficient handling of byte data

๐Ÿ› ๏ธ Features

Prelude

For convenience, you can import all commonly used types with the prelude:

use dropslot::prelude::*; // Imports Bus, Topic, Sub, and BusError

Default Features

  • bytes - Zero-copy operations for bytes::Bytes

Optional Features

  • serde - Serialization support for complex message types

Enable features in your Cargo.toml:

[dependencies]
dropslot = { version = "0.2", features = ["serde"] }

๐Ÿ“ˆ Benchmarks

Run benchmarks with:

cargo bench

๐Ÿงช Testing

Run tests with:

cargo test

Run examples:

cargo run --example basic_usage
cargo run --example high_performance
cargo run --example real_world --features="serde"

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

๐Ÿ“ License

This project is licensed under either of

at your option.

๐Ÿ” Changelog

See CHANGELOG.md for recent changes.


Note: This library implements latest-only delivery semantics, meaning subscribers only receive the most recent message. Topic cleanup is manual via bus.cleanup_unused_topics() - call this periodically in long-running applications to prevent memory leaks. For all-message delivery, consider using tokio::sync::broadcast or similar alternatives.

Commit count: 0

cargo fmt