use std::{ task::{Poll, Context}, pin::Pin, }; use futures::{ Stream, Sink, channel::mpsc, }; pub fn spammer(tag: impl Into, count: usize) -> impl Stream + Unpin { Spammer::new(tag, count) } pub fn barrel() -> Barrel where I: Unpin { Barrel::new() } impl Barrel where I: Unpin { pub fn new() -> Self { let (tx, rx) = mpsc::unbounded(); Self { things: rx, handle: tx, } } pub fn collect(&mut self) -> Vec { let mut res = Vec::new(); while let Ok(Some(item)) = self.things.try_next() { res.push(item); } res } pub fn pipe(&self) -> Pipe { Pipe { tx: self.handle.clone(), } } } pub struct Barrel where I: Unpin { things: mpsc::UnboundedReceiver, handle: mpsc::UnboundedSender, } pub struct Pipe { tx: mpsc::UnboundedSender, } impl Sink for Pipe where I: Unpin { type Error = std::convert::Infallible; fn poll_ready( self: Pin<&mut Self>, _: &mut Context ) -> Poll> { Poll::Ready(Ok(())) } fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { self.as_mut().tx.unbounded_send(item).unwrap(); Ok(()) } fn poll_flush( self: Pin<&mut Self>, _: &mut Context ) -> Poll> { Poll::Ready(Ok(())) } fn poll_close( self: Pin<&mut Self>, _: &mut Context ) -> Poll> { Poll::Ready(Ok(())) } } pub struct Spammer { tag: String, count: usize, } impl Spammer { fn new(tag: impl Into, count: usize) -> Self { Self { tag: tag.into(), count, } } } impl Stream for Spammer { type Item = String; fn poll_next( mut self: Pin<&mut Self>, _: &mut Context, ) -> Poll> { if self.count == 0 { return Poll::Ready(None); } self.count -= 1; let number = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_micros()) .unwrap_or(100); Poll::Ready(Some(format!("{} says `{}`", self.tag.clone(), number))) } }