| Crates.io | double_decker |
| lib.rs | double_decker |
| version | 0.0.3 |
| created_at | 2020-08-22 15:10:50.144027+00 |
| updated_at | 2020-11-02 17:13:14.349549+00 |
| description | A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels |
| homepage | |
| repository | https://github.com/timfish/double_decker |
| max_upload_size | |
| id | 279531 |
| size | 18,809 |
A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels.
Unlike the the Bus from the bus crate, double_decker::Bus
is unbounded and everyone knows that double-decker buses
carry more passengers than a regular bus 🤷♂️.
Unlike bus::Bus, double_decker::Bus implements a cheap Clone() which I've found useful.
double_decker::Bus is better than bus::Bus?No.
The bus crate is mature and completely lock-free. This implementation is neither!
T must implement Clone so it can be passed to all consumers.
When you call add_rx(), a Sender/Receiver pair are created and the Sender is
stored in a HashMap behind a RwLock.
broadcast() uses shared read access of the RwLock and sends out events to each Receiver in the
order they were added.
Lock contention can only occur when the number of subscribers changes as this requires write access to
the RwLock. This occurs when you call add_rx() or when you call broadcast() and one or more
Sender returns SendError because it's become disconnected.
bus crateSingle-send, multi-consumer example
use double_decker::Bus;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
bus.broadcast("Hello");
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("Hello"));
Multi-send, multi-consumer example
use double_decker::Bus;
use std::thread;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
// start a thread that sends 1..100
let j = thread::spawn(move || {
for i in 1..100 {
bus.broadcast(i);
}
});
// every value should be received by both receivers
for i in 1..100 {
// rx1
assert_eq!(rx1.recv(), Ok(i));
// and rx2
assert_eq!(rx2.recv(), Ok(i));
}
j.join().unwrap();
Also included are subscribe and subscribe_on_thread which allow you to subscribe to broadcast
events with a closure that is called on every broadcast. subscribe is blocking whereas
subscribe_on_thread calls the closure from another thread.
subscribe_on_thread returns a Subscription which you should hang on to as the thread terminates
when this is dropped.
use double_decker::{Bus, SubscribeToReader};
let bus = Bus::<i32>::new();
// This would block
// bus.subscribe(Box::new(move |_event| {
// // This closure is called on every broadcast
// }));
let _subscription = bus.subscribe_on_thread(Box::new(move |_event| {
// This closure is called on every broadcast
}));
bus.broadcast(5);
License: MIT