| Crates.io | superqueue |
| lib.rs | superqueue |
| version | 0.1.0 |
| created_at | 2025-10-04 13:12:37.978405+00 |
| updated_at | 2025-10-04 13:12:37.978405+00 |
| description | A tiny, lock-light, type-routed message bus providing event streams and latest-value topics (snapshots) for fast state and event dispatch. |
| homepage | |
| repository | https://github.com/alexdesander/superqueue |
| max_upload_size | |
| id | 1867885 |
| size | 68,999 |
A tiny, lock-light, type-routed message bus for Rust.
Primary use cases: fast, ergonomic state/event dispatch for game development (systems & actors exchanging events and sampling shared game state), background workers, UI event routing, and modular plugin systems.
⚠️ Blocking caution: The blocking send/receive variants can deadlock if you create cyclic waits or receivers do not drain. Prefer the non-blocking
try_*calls where appropriate.
Type-based routing – you work with concrete T; messages are erased internally as Arc<dyn Any + Send + Sync> and downcast on read.
Two complementary primitives, one API surface:
Backpressure (streams) – per-subscription bounded (including rendezvous Some(0)) or unbounded queues.
Simple ownership – SuperQueueActor unsubscribes itself in Drop.
Cheap cloning – the bus is shared and Clone.
This crate is not no_std.
# Cargo.toml
[dependencies]
superqueue = "0.1"
use superqueue::SuperQueue;
let bus = SuperQueue::new();
let mut recv = bus.create_actor();
let send = bus.create_actor();
recv.subscribe::<String>(None)?; // unbounded queue
send.send("Hello".to_string())?; // broadcast (may block if a bounded queue is full)
let msg = recv.read::<String>()?; // blocking read
assert_eq!(&*msg, "Hello");
# Ok::<_, Box<dyn std::error::Error>>(())
use superqueue::SuperQueue;
let bus = SuperQueue::new();
let publisher = bus.create_actor();
let mut reader = bus.create_actor();
// No subscription needed for latest-value topics.
assert!(reader.read_latest::<u32>().is_none()); // nothing published yet
publisher.update_latest::<u32>(1);
assert_eq!(*reader.read_latest::<u32>().unwrap(), 1); // sees the new value once
assert!(reader.read_latest::<u32>().is_none()); // at most once per update
publisher.update_latest::<u32>(2);
assert_eq!(*reader.read_latest::<u32>().unwrap(), 2);
# use superqueue::SuperQueue;
let bus = SuperQueue::new();
let mut physics = bus.create_actor();
let ai = bus.create_actor();
// Physics consumes events...
physics.subscribe::<(u32, u32)>(Some(256))?; // position updates as events
// ...and also samples a latest snapshot AI publishes opportunistically.
ai.update_latest::<f32>(0.016); // delta time in seconds
# Ok::<_, Box<dyn std::error::Error>>(())
A queue (SuperQueue) is shared and cheap to clone.
An actor (SuperQueueActor) can:
T and send/read events of T.update_latest<T>(value) and sample with read_latest::<T>() -> Option<Arc<T>> (no subscription required).Keying:
(TypeId, ActorId) and create a private channel per subscriber.TypeId only and hold one slot for the entire bus. Each actor maintains its own cursor to observe each update at most once.| Method | Delivery | Blocking | Returns / Errors |
|---|---|---|---|
send(T) |
Broadcast to all subscribers of T |
May block per receiver if that receiver’s queue is bounded and full | Ok or SendError::NoSubscribers |
try_send(T) |
Broadcast, non-blocking | Never blocks; per-receiver enqueue attempts may be dropped if full | Ok or TrySendError::{NoSubscribers, NoSpaceAvailable} (NoSpaceAvailable only if none accepted) |
send_single(T) |
Exactly one subscriber | Prefers a subscriber with capacity; if all are full, blocks on a random subscriber | Ok or SendError::NoSubscribers |
try_send_single(T) |
Exactly one, non-blocking | Never blocks; drops the message if everyone is full | Ok or TrySendError::{NoSubscribers, NoSpaceAvailable} |
Tip:
Some(0)creates a rendezvous channel.try_send*will always returnNoSpaceAvailableunless a receiver is waiting;send*will rendezvous (and may block).
| Method | Semantics | Blocking |
|---|---|---|
update_latest(T) |
Overwrite the single slot for type T. Last-writer-wins. |
Never |
read_latest::<T>() -> Option<Arc<T>> |
Return the newest value once per actor per update; None if unchanged or never published. |
Never |
Per-type FIFO (streams): For a given actor and type T, message order matches send order.
Arc cloning: Broadcast is cheap; the bus clones Arc<T> per subscriber.
Drop safety: Dropping an actor unsubscribes all of its stream subscriptions; subsequent sends won’t panic due to closed channels.
No replay (streams): Messages sent while an actor is unsubscribed are not queued for it.
Latest-value topics:
TypeId across the bus; no history is kept.#[derive(Clone)]
struct PlayerMoved { id: u32, x: f32, y: f32 }
let bus = SuperQueue::new();
let mut physics = bus.create_actor();
let mut audio = bus.create_actor();
physics.subscribe::<PlayerMoved>(None)?;
audio.subscribe::<PlayerMoved>(None)?;
let tx = bus.create_actor();
tx.try_send(PlayerMoved { id: 1, x: 4.0, y: 2.0 })?; // non-blocking
#[derive(Clone)]
struct PathJob { start: (i32,i32), goal: (i32,i32) }
let bus = SuperQueue::new();
// two workers
let mut w1 = bus.create_actor(); w1.subscribe::<PathJob>(Some(128))?;
let mut w2 = bus.create_actor(); w2.subscribe::<PathJob>(Some(128))?;
let client = bus.create_actor();
// deliver each job to exactly one available worker (non-blocking)
for job in jobs() {
let _ = client.try_send_single(job); // drop if both queues are full
}
let mut ui = bus.create_actor();
ui.subscribe::<String>(Some(64))?; // bounded
producer.try_send("notification".to_string()).ok(); // avoid stalling producers
// Publish "frame delta" as a coalescing snapshot:
let publisher = bus.create_actor();
publisher.update_latest::<f32>(0.016);
// Any system can sample once per update without subscribing:
let mut consumer = bus.create_actor();
if let Some(dt) = consumer.read_latest::<f32>() {
// use *dt
}
SuperQueueError::{ NotSubscribed, AlreadySubscribed, EmptyQueue }SendError::NoSubscribersTrySendError::{ NoSubscribers, NoSpaceAvailable }MIT © alexdesander