| Crates.io | sbc |
| lib.rs | sbc |
| version | 0.1.0 |
| created_at | 2025-11-18 17:36:29.563322+00 |
| updated_at | 2025-11-18 17:36:29.563322+00 |
| description | Multi-producer, multi-consumer synchronous broadcast channel |
| homepage | |
| repository | https://github.com/Tyrannican/sbc.git |
| max_upload_size | |
| id | 1938730 |
| size | 57,458 |
A multi-producer, multi-consumer synchronous broadcast channel using locks based on the Tokio broadcast channel. Each value sent is seen by all consumers.
For a lock-free style of broadcast channel, see the Bus library.
use sbc::channel;
use std::thread;
fn main() {
let (tx, mut rx1) = channel(16);
let mut rx2 = tx.subscribe();
let t1 = thread::spawn(move || {
assert_eq!(rx1.recv().unwrap(), 10);
assert_eq!(rx1.recv().unwrap(), 20);
});
let t2 = thread::spawn(move || {
assert_eq!(rx2.recv().unwrap(), 10);
assert_eq!(rx2.recv().unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
let _ = t1.join().unwrap();
let _ = t2.join().unwrap();
}
As sent messages must be retained until all Receiver handles receive a clone of the
value, this broadcast channel is susceptible to the "slow receiver" problem. In this case, all
but one receiver are able to receive values at the rate they are sent. Because one receiver is
stalling, the channel starts to fill up.
To handle this, this broadcast channel sets a bound on the number of values the channel may hold at a given time.
If a value is sent when the channel is at capacity, the oldest value currently held by the channel is released and overwritten.
Once a lag has been received, the lagging receiver's position is updated to the oldest value contained by the channel.
It is up to the end-user to determine how they wish to handle this behaviour.
use sbc::channel;
fn main() {
let (tx, mut rx) = channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();
// Receiver has lagged behind
assert!(rx.recv().is_err());
// We can abort or continue with lost messages
assert_eq!(rx.recv().unwrap(), 20);
assert_eq!(rx.recv().unwrap(), 30);
}