broadcast-iterator

Crates.iobroadcast-iterator
lib.rsbroadcast-iterator
version0.0.1
created_at2026-01-06 01:21:30.908644+00
updated_at2026-01-06 01:21:30.908644+00
description`.clone()` an `I: !Clone + Iterator`!
homepage
repositoryhttps://github.com/kayabaNerve/broadcast-iterator
max_upload_size
id2024902
size26,779
Luke Parker (kayabaNerve)

documentation

README

broadcast-iterator

.clone() an I: !Clone + Iterator!

use broadcast_iterator::BroadcastIterator;

fn unwrap_ready<F: Future>(fut: &mut F) -> F::Output {
  use core::task::{Poll, Waker, Context};
  match F::poll(core::pin::pin!(fut), &mut Context::from_waker(Waker::noop())) {
    Poll::Ready(value) => value,
    Poll::Pending => panic!("`Future` unexpectedly yielded `Pending`"),
  }
}

let mut broadcast_iterator = BroadcastIterator::from([1, 2, 3].into_iter());

let mut consumers = broadcast_iterator.consumers(2);
let mut consumer_one = consumers.next().unwrap();
let mut consumer_two = consumers.next().unwrap();

assert_eq!(*unwrap_ready(&mut consumer_one), Some(1));
assert_eq!(*unwrap_ready(&mut consumer_two), Some(1));

assert_eq!(*unwrap_ready(&mut consumer_one), Some(2));
assert_eq!(*unwrap_ready(&mut consumer_two), Some(2));

assert_eq!(*unwrap_ready(&mut consumer_one), Some(3));
assert_eq!(*unwrap_ready(&mut consumer_two), Some(3));

assert_eq!(*unwrap_ready(&mut consumer_one), None);
assert_eq!(*unwrap_ready(&mut consumer_two), None);

What is this?

The goal was to create multiple consumers of a single iterator, without any use of the heap, which cloning the underlying iterator was assumed to require.

The primary issue is in how the multiple consumers may be iterated over at distinct rates. This means when items from the underlying iterator are consumed, they would have to buffered somewhere, requiring a fixed bound (which would be incomplete for the space of all possible usages) or an unbounded buffer (requiring dynamic allocation). The solution here is not to return &I::Item yet Poll<&I::Item>, allowing denoting if the current consumer is blocked by another consumer which has yet to read the next item.

This library allows spawning arbitrary consumers for an iterator, each which may be polled for a reference to the next item from the underlying iterator, without any allocations or usage of the heap. The entire solution is no-std and no-alloc, [Send] and [Sync] (if the [Iterator] and [Iterator::Item] are), and supports dynamic provisioning of the amount of [Consumer]s.

While [Consumer] is defined as a [Future], it can be assumed to return [Poll::Ready] if it's known all other [Consumer]s have been polled. This allows trivially writing a minimal runtime for polling consumers.

On Push vs Pull

This design is notable for being a pull-based design, where the consumers are asked for the next item. The alternative would be a push-based design, where the underlying iterator yields its items and then references are pushed onto sinks.

This design was explicitly preferred because it presents a bottom-up approach. The rest of the application may be built on top of a consumer. With a push-based design, the rest of the application would have to fit within a state machine supporting sinking individual items from the underlying iterator, which may be horrendously infeasible depending on the size of the application.

Caveats

  • [BroadcastIterator] is eager and may call [Iterator::next] on the underlying iterator unnecessarily.
  • [Consumer::drop] may perform a spin lock if another thread is actively fetching the next item from the underlying iterator.
  • While the amount of [Consumer]s may be dynamically decided, all prior-created [Consumer]s must be dropped before a new set of [Consumer]s may be created. This is to ensure the maximum amount of simultaneously-existing [Consumer]s is less than the limit require for this to be safe.
  • This library is completely untested other than the above example.

Future Work

  • Move to lazily calling the underlying iterator so we can support retrieving the iterator in a sound fashion.
  • Use a ring buffer (parameterized by a const RING_BUFFER_LEN: usize) for the next items from the underlying iterator, allowing a consumer to advance more than just one item ahead of the other consumers.
  • Properly test this library to ensure its safety.
  • Properly utilize [core::task::Waker].
Commit count: 1

cargo fmt