use std::marker::Unpin; use std::time::Duration; use std::thread::sleep; use std::sync::Arc; use std::pin::Pin; use std::task::{Poll, Context}; use futures_signals::signal_vec::{VecDiff, SignalVec}; use futures_signals::signal_map::{MapDiff, SignalMap}; use futures_signals::signal::Signal; use futures_core::Stream; use futures_util::future::poll_fn; use futures_util::task::{waker, ArcWake}; use futures_executor::block_on; use pin_utils::pin_mut; #[allow(dead_code)] pub struct ForEachSignal where A: Signal { signal: A, callbacks: Vec>)>>, } #[allow(dead_code)] impl ForEachSignal where A: Signal { pub fn new(signal: A) -> Self { Self { signal, callbacks: vec![], } } pub fn next(mut self, callback: B) -> Self where B: FnMut(&mut Context, Poll>) + 'static { self.callbacks.insert(0, Box::new(callback)); self } pub fn run(self) { let mut callbacks = self.callbacks; let signal = self.signal; pin_mut!(signal); block_on(poll_fn(move |cx| -> Poll<()> { loop { return match callbacks.pop() { Some(mut callback) => { // TODO is this safe ? let poll = signal.as_mut().poll_change(cx); match poll { Poll::Ready(None) => { callback(cx, poll); Poll::Ready(()) }, Poll::Ready(Some(_)) => { callback(cx, poll); continue; }, Poll::Pending => { callback(cx, poll); Poll::Pending }, } }, None => { Poll::Ready(()) }, } } })); } } #[allow(dead_code)] pub fn with_noop_context U>(f: F) -> U { // borrowed this design from the futures source struct Noop; impl ArcWake for Noop { fn wake_by_ref(_: &Arc) {} } // TODO is this correct ? let waker = waker(Arc::new(Noop)); let context = &mut Context::from_waker(&waker); f(context) } #[allow(dead_code)] pub fn delay() { // TODO is it guaranteed that this will yield to other threads ? sleep(Duration::from_millis(10)); } fn get_polls(f: F, mut p: P) -> Vec>> where F: FnOnce(), P: FnMut(&mut Context) -> Poll> { let mut f = Some(f); let mut output = vec![]; block_on(poll_fn(|cx| { loop { let x = p(cx); let poll = match x { Poll::Ready(Some(_)) => { output.push(x); continue; }, Poll::Ready(None) => { Poll::Ready(()) }, Poll::Pending => { Poll::Pending }, }; output.push(x); if let Some(f) = f.take() { f(); } return poll; } })); output } #[allow(dead_code)] pub fn get_signal_polls(signal: A, f: F) -> Vec>> where A: Signal, F: FnOnce() { pin_mut!(signal); // TODO is the as_mut correct ? get_polls(f, |cx| Pin::as_mut(&mut signal).poll_change(cx)) } #[allow(dead_code)] pub fn get_signal_vec_polls(signal: A, f: F) -> Vec>>> where A: SignalVec, F: FnOnce() { pin_mut!(signal); // TODO is the as_mut correct ? get_polls(f, |cx| Pin::as_mut(&mut signal).poll_vec_change(cx)) } #[allow(dead_code)] pub fn get_stream_polls(stream: A, f: F) -> Vec>> where A: Stream, F: FnOnce() { pin_mut!(stream); // TODO is the as_mut correct ? get_polls(f, |cx| Pin::as_mut(&mut stream).poll_next(cx)) } #[allow(dead_code)] pub fn get_signal_map_polls(signal: A, f: F) -> Vec>>> where A: SignalMap, F: FnOnce() { pin_mut!(signal); // TODO is the as_mut correct ? get_polls(f, |cx| Pin::as_mut(&mut signal).poll_map_change(cx)) } #[allow(dead_code)] pub fn get_all_polls(signal: A, mut initial: B, mut f: F) -> Vec>> where A: Signal, F: FnMut(&B, &mut Context) -> B { let mut output = vec![]; // TODO is this correct ? pin_mut!(signal); block_on(poll_fn(|context| { loop { initial = f(&initial, context); // TODO is this correct ? let x = Pin::as_mut(&mut signal).poll_change(context); let x: Poll<()> = match x { Poll::Ready(Some(_)) => { output.push(x); continue; }, Poll::Ready(None) => { output.push(x); Poll::Ready(()) }, Poll::Pending => { output.push(x); Poll::Pending }, }; return x; } })); output } #[allow(dead_code)] pub fn map_poll_vec(signal: A, mut callback: C) -> Vec where A: SignalVec, C: FnMut(&A, Poll>>) -> B { let mut changes = vec![]; // TODO is this correct ? pin_mut!(signal); block_on(poll_fn(|context| { loop { // TODO is this correct ? let x = Pin::as_mut(&mut signal).poll_vec_change(context); return match x { Poll::Ready(Some(_)) => { changes.push(callback(&signal, x)); continue; }, Poll::Ready(None) => { changes.push(callback(&signal, x)); Poll::Ready(()) }, Poll::Pending => { changes.push(callback(&signal, x)); Poll::Pending }, }; } })); changes } #[allow(dead_code)] pub fn map_poll_map(signal: A, mut callback: C) -> Vec where A: SignalMap, C: FnMut(&A, Poll>>) -> B { let mut changes = vec![]; // TODO is this correct ? pin_mut!(signal); block_on(poll_fn(|context| { loop { // TODO is this correct ? let x = Pin::as_mut(&mut signal).poll_map_change(context); return match x { Poll::Ready(Some(_)) => { changes.push(callback(&signal, x)); continue; }, Poll::Ready(None) => { changes.push(callback(&signal, x)); Poll::Ready(()) }, Poll::Pending => { changes.push(callback(&signal, x)); Poll::Pending }, }; } })); changes } #[allow(dead_code)] #[track_caller] pub fn assert_signal_eq(signal: S, expected: Vec>>) where A: std::fmt::Debug + PartialEq, S: Signal { assert_eq!( // TODO a little gross get_all_polls(signal, (), |_, _| {}), expected, ); } #[allow(dead_code)] #[track_caller] pub fn assert_stream_eq(stream: S, expected: Vec>>) where A: std::fmt::Debug + PartialEq, S: Stream { assert_eq!( get_stream_polls(stream, || {}), expected, ); } #[allow(dead_code)] #[track_caller] pub fn assert_signal_vec_eq(signal: S, expected: Vec>>>) where A: std::fmt::Debug + PartialEq, S: SignalVec { let actual = map_poll_vec(signal, |_output, change| change); assert_eq!( actual, expected, ); } #[allow(dead_code)] #[track_caller] pub fn assert_signal_map_eq(signal: S, expected: Vec>>>) where K: std::fmt::Debug + PartialEq, V: std::fmt::Debug + PartialEq, S: SignalMap { let actual = map_poll_map(signal, |_output, change| change); assert_eq!( actual, expected, ); } #[allow(dead_code)] #[derive(Debug)] #[must_use = "Source does nothing unless polled"] pub struct Source { changes: Vec>, } impl Unpin for Source {} impl Source { #[allow(dead_code)] #[inline] pub fn new(changes: Vec>) -> Self { Self { changes } } fn poll(&mut self, cx: &mut Context) -> Poll> { if self.changes.len() > 0 { match self.changes.remove(0) { Poll::Pending => { cx.waker().wake_by_ref(); Poll::Pending }, Poll::Ready(change) => Poll::Ready(Some(change)), } } else { Poll::Ready(None) } } } impl Stream for Source { type Item = A; #[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll(cx) } } impl Signal for Source { type Item = A; #[inline] fn poll_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll(cx) } } impl SignalVec for Source> { type Item = A; #[inline] fn poll_vec_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { self.poll(cx) } } impl SignalMap for Source> { type Key = K; type Value = V; #[inline] fn poll_map_change(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { self.poll(cx) } }