use std::{ pin::Pin, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, }; use futures::Future; use tokio::sync::Notify; use crate::{ builder::Config, chan::{BusSender, Receiver, Sender}, handler::HandlerSpawner, message::Msg, Builder, BusInner, Error, Handler, Message, }; #[derive(Default)] pub(crate) struct TaskCounter { pub(crate) running: AtomicUsize, notify: Notify, } pub(crate) struct TaskCounterLease bool> { need_notify: S, counter: Arc, } impl bool> Drop for TaskCounterLease { fn drop(&mut self) { let notify = (self.need_notify)(); let prev = self.counter.running.fetch_sub(1, Ordering::Relaxed); if notify && prev == 1 { self.counter.notify.notify_waiters(); } } } impl bool> TaskCounterLease { fn new(counter: Arc, need_notify: S) -> Self { counter.running.fetch_add(1, Ordering::Relaxed); Self { counter, need_notify, } } } impl TaskCounter { pub fn lease_unit bool>(self: Arc, need_notify: S) -> TaskCounterLease { TaskCounterLease::new(self, need_notify) } #[inline] pub async fn wait(&self) { self.notify.notified().await } } pub(crate) trait TaskSpawner: Send + Sync { fn config(&self, stream_id: u32) -> Config; fn is_producer(&self) -> bool; #[allow(clippy::too_many_arguments)] fn spawn_task( &self, rx: Receiver>, stream_id: u32, task_id: u32, abort: Arc, task_counter: Arc, spawn_counter: Arc, index_counter: Arc, bus: Arc, ) -> Pin> + Send + '_>>; } pub(crate) struct TaskSpawnerWrapper { inner: Arc>, } impl Clone for TaskSpawnerWrapper { fn clone(&self) -> Self { Self { inner: self.inner.clone(), } } } impl TaskSpawnerWrapper { pub fn from_handler + 'static>(builder: B) -> Self where B::Context: Handler, { Self { inner: Arc::new(HandlerSpawner::new(builder)) as _, } } #[inline] #[allow(clippy::too_many_arguments)] pub async fn spawn_task( &self, (tx, rx): (Sender>, Receiver>), stream_id: u32, task_id: u32, abort: Arc, task_counter: Arc, spawn_counter: Arc, index_counter: Arc, bus: Arc, ) -> Result, Error> { self.inner .spawn_task( rx, stream_id, task_id, abort, task_counter, spawn_counter, index_counter, bus, ) .await?; Ok(BusSender::new(self.inner.is_producer(), tx)) } #[inline] pub(crate) fn config(&self, stream_id: u32) -> Config { self.inner.config(stream_id) } }