//! General wraper for calling non-send objects from other threads. use futures::future::LocalBoxFuture; use futures::prelude::*; use std::marker::PhantomData; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; #[derive(Error, Debug)] pub enum Error { #[error("wrapped object closed")] SendFailed {}, #[error("{0}")] RecvFailed(#[from] oneshot::error::RecvError), } pub struct SendWrap { tx: mpsc::Sender + Send + 'static>>, } trait Runnable { fn exec<'a>(self: Box, item: T) -> future::LocalBoxFuture<'a, ()>; } pub trait AsyncFnOnce<'a, Arg> { type Output: 'static; type Fut: Future + 'a; fn call(self, arg: Arg) -> Self::Fut; } impl<'a, Arg, RF: Future + 'a, F: FnOnce(Arg) -> RF> AsyncFnOnce<'a, Arg> for F where RF::Output: 'static, { type Output = RF::Output; type Fut = RF; fn call(self, arg: Arg) -> Self::Fut { self(arg) } } struct Call { reply: oneshot::Sender, inner: F, _marker: PhantomData T>, } impl + 'static> Runnable for Call { fn exec<'a>(self: Box, item: T) -> LocalBoxFuture<'a, ()> { let inner = self.inner; let reply = self.reply; async move { let result = inner.call(item).await; let _ = reply.send(result); } .boxed_local() } } impl SendWrap { /// ## Example /// /// ```rust /// let c = wrap(client); /// /// tokio::spawn(async move { /// c.run_async(|c| c.sockets()).await /// }); /// ``` /// pub async fn run_async + Send + 'static>( &self, f: F, ) -> Result where F::Output: Send, { let (tx, rx) = oneshot::channel(); let _ = self .tx .send(Box::new(Call { inner: f, reply: tx, _marker: PhantomData, })) .await .map_err(|_e| Error::SendFailed {}); rx.await.map_err(Error::RecvFailed) } } pub fn wrap(item: T) -> SendWrap { let (tx, mut rx) = mpsc::channel:: + Send + 'static>>(1); let _handle = tokio::task::spawn_local(async move { while let Some(runnable) = rx.recv().await { tokio::task::spawn_local(runnable.exec(item.clone())); } }); SendWrap { tx } }