// https://github.com/rust-lang/rust/issues/65991 #![feature(trait_upcasting)] use anyhow::{anyhow, bail, Result}; use crossmist::{ func, lambda, tokio::{duplex, Child, Duplex}, BindValue, FnOnceObject, Object, }; use std::any::Any; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; // Simulate trait intersection trait AnyObject: Any + Object {} impl AnyObject for T {} type TypeErased = Box; struct Worker { // The worker process will receive tasks as functions returning Any output and return their // output via the channel. channel: Duplex + Send + Sync>, TypeErased>, child: Child<()>, } struct WorkerPool { workers_sender: Option>, workers_receiver: Mutex>, } impl WorkerPool { pub async fn new(concurrency: usize) -> Result { let (workers_sender, workers_receiver) = mpsc::unbounded_channel(); for _ in 0..concurrency { let (ours, theirs) = duplex()?; let child = worker.spawn_tokio(theirs).await?; if workers_sender .send(Worker { channel: ours, child, }) .is_err() { bail!("Queue initialization failed"); } } Ok(Self { workers_sender: Some(workers_sender), workers_receiver: Mutex::new(workers_receiver), }) } pub async fn run( &self, func: impl FnOnceObject<(), Output = Output> + Send + Sync + 'static, ) -> Result { let workers_sender = self .workers_sender .as_ref() .ok_or_else(|| anyhow!("Pool is closed"))?; let wrapped_function: Box< dyn FnOnceObject<(), Output = TypeErased> + Send + Sync + 'static, > = Box::new(_wrapped_function.bind_value(func)); let mut workers_receiver = self.workers_receiver.lock().await; let mut worker_obj = workers_receiver .recv() .await .ok_or_else(|| anyhow!("All workers are dead"))?; drop(workers_receiver); let output = worker_obj.channel.request(&wrapped_function).await?; let output = *(output as Box).downcast().unwrap(); if workers_sender.send(worker_obj).is_err() { bail!("Failed to put worker back to queue"); } Ok(output) } async fn close(self) -> Result<()> { drop(self.workers_sender); let mut workers_receiver = self.workers_receiver.lock().await; while let Some(worker_obj) = workers_receiver.recv().await { drop(worker_obj.channel); worker_obj.child.join().await?; } Ok(()) } } #[func] fn _wrapped_function< Output: Object + 'static, Func: FnOnceObject<(), Output = Output> + Send + Sync + 'static, >( func: Func, ) -> TypeErased { Box::new(func.call_object_once(())) } #[func] #[tokio::main(flavor = "current_thread")] async fn worker( mut channel: Duplex + Send + Sync>>, ) { while let Some(func) = channel.recv().await.unwrap() { channel.send(&func.call_object_once(())).await.unwrap(); } } #[crossmist::main] #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { let pool = Arc::new(WorkerPool::new(4).await?); let mut tasks = Vec::new(); for x in 1..=5 { for y in 1..=5 { let pool = pool.clone(); tasks.push(tokio::spawn(async move { let func = lambda! { move(x: i32, y: i32) || -> i32 { x + y } }; println!("{x} + {y} = {}", pool.run(func).await?); Result::<()>::Ok(()) })); } } for task in tasks { task.await??; } Arc::try_unwrap(pool) .or_else(|_| bail!("Pool is still in use"))? .close() .await?; Ok(()) }