use std::ptr; use std::fmt::Debug; use tracing::debug; use futures_lite::Stream; use futures_lite::stream::StreamExt; use pin_utils::unsafe_pinned; use pin_utils::unsafe_unpinned; use fluvio_future::task::spawn; use crate::sys::napi_value; use crate::val::JsEnv; use crate::NjError; use crate::TryIntoJs; pub trait NjStream: Stream { fn js_then(self, fut: F) -> JsThen where F: FnMut(Self::Item), Self: Sized, { JsThen::new(self, fut) } } impl NjStream for T where T: Stream {} pub struct JsThen { stream: St, f: F, } impl Unpin for JsThen {} impl JsThen where St: Stream, F: FnMut(St::Item), { unsafe_pinned!(stream: St); unsafe_unpinned!(f: F); pub fn new(stream: St, f: F) -> JsThen { Self { stream, f } } } impl TryIntoJs for JsThen where St: Stream + Send + 'static, F: FnMut(St::Item) + Send + 'static, St::Item: Debug, { fn try_to_js(self, _js_env: &JsEnv) -> Result { let mut stream = Box::pin(self.stream); let mut cb = self.f; spawn(async move { while let Some(item) = stream.next().await { debug!("got item: {:#?}, invoking Js callback", item); cb(item); } }); Ok(ptr::null_mut()) } }