use std::{ collections::VecDeque, pin::Pin, task::{Context, Poll}, }; use bevy_ecs::event::{Event, Events, ManualEventReader}; use futures::{FutureExt, Stream}; use crate::{TaskContext, WithWorld}; pub trait EventStreamTaskExt { fn event_stream(&self) -> impl Stream; } impl EventStreamTaskExt for TaskContext { fn event_stream(&self) -> impl Stream { EventStream::::new(self.clone()) } } struct EventStreamData { items: VecDeque, reader: ManualEventReader, } impl Default for EventStreamData { fn default() -> Self { EventStreamData { items: Default::default(), reader: Default::default(), } } } enum EventStreamState { HasItems(EventStreamData), WaitingForTask(WithWorld>), } impl Default for EventStreamState { fn default() -> Self { Self::HasItems(Default::default()) } } /// Provides a [`Stream`] interface over a series of [`Event`]s. Asynchronously iterates /// over all [`Event`]s from the start of the [`Events`] queue. /// /// ``` /// let mut events = cx.event_stream::(); /// while let Some(ev) = events.next().await { /// println!("Got a keyboard event: {ev:?}"); /// } /// ``` pub struct EventStream where E: Event, { cx: TaskContext, state: EventStreamState, } impl EventStream { pub fn new(cx: TaskContext) -> Self { Self { cx, state: Default::default(), } } } impl Stream for EventStream { type Item = E; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match &mut self.state { EventStreamState::HasItems(data) => { if let Some(next) = data.items.pop_front() { return Poll::Ready(Some(next)); } else { let mut reader = std::mem::replace(&mut data.reader, Default::default()); let fut = self.cx.with_world(move |world| { let items = reader .read(world.resource::>()) .map(Clone::clone) .collect::>(); EventStreamData { items, reader } }); self.state = EventStreamState::WaitingForTask(fut); } } EventStreamState::WaitingForTask(fut) => { if let Poll::Ready(data) = fut.poll_unpin(cx) { self.state = EventStreamState::HasItems(data); } else { return Poll::Pending; } } } } } }