use parking_lot::Mutex; use async_channel as mpmc; use tokio::{sync::broadcast, task::JoinSet}; use crate::{dispatcher, worker::Worker}; type Ref = std::sync::Arc; pub struct Executor { state: broadcast::Receiver, event: mpmc::Receiver, inner: JoinSet<()>, count: usize, wal: Vec>, } impl Executor where S1: Send + Sync + 'static, E1: Send + Sync + 'static, { pub fn new(state: broadcast::Receiver, event: mpmc::Receiver, count: usize) -> Self { let (inner, mut wal) = (JoinSet::new(), Vec::new()); for _ in 0..count { let value = Ref::new(Mutex::new(None)); wal.push(value); } Self { state, event, inner, count, wal, } } async fn execute(&mut self, wi: usize, worker: Ref, state: Ref) where W1: Worker, { let (wal, receiver) = (self.wal[wi].clone(), self.event.clone()); let worker = move |event| { // Keep in mind that this closure will be called on every event let (worker, state) = (worker.clone(), state.clone()); async move { worker.execute(state, event).await } }; self.inner.spawn(dispatcher::dispatch(wal, receiver, worker)); } async fn respawn(&mut self, worker: Ref, state: Ref) where W1: Worker, { self.inner.shutdown().await; for wi in 0..self.count { // Spawn a few workers to use the full power of async let (worker, state) = (worker.clone(), state.clone()); self.execute(wi, worker, state).await; } } pub async fn receive(mut self, worker: W1) where W1: Worker, { let worker = Ref::new(worker); while let Ok(state) = self.state.recv().await { // Stop and start workers when we receive new state let (worker, state) = (worker.clone(), Ref::new(state)); self.respawn(worker, state).await; } } }