// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use super::future_arena::FutureAllocation; use super::future_arena::FutureArena; use super::op_results::*; use super::OpDriver; use super::OpInflightStats; use crate::OpId; use crate::PromiseId; use anyhow::Error; use bit_set::BitSet; use deno_unsync::spawn; use deno_unsync::JoinHandle; use deno_unsync::UnsyncWaker; use futures::future::poll_fn; use futures::stream::FuturesUnordered; use futures::task::noop_waker_ref; use futures::FutureExt; use futures::Stream; use std::cell::Cell; use std::cell::RefCell; use std::collections::VecDeque; use std::future::ready; use std::future::Future; use std::pin::Pin; use std::rc::Rc; use std::task::ready; use std::task::Context; use std::task::Poll; async fn poll_task( mut results: SubmissionQueueResults< FuturesUnordered, PendingOpInfo>>, >, tx: Rc>>>, tx_waker: Rc, ) { loop { let ready = poll_fn(|cx| results.poll_next_unpin(cx)).await; tx.borrow_mut().push_back(ready); tx_waker.wake_by_ref(); } } #[derive(Default)] enum MaybeTask { #[default] Empty, Task(Pin>>), Handle(JoinHandle<()>), } /// [`OpDriver`] implementation built on a tokio [`JoinSet`]. pub struct FuturesUnorderedDriver< C: OpMappingContext + 'static = V8OpMappingContext, > { len: Cell, task: Cell, task_set: Cell, queue: SubmissionQueue< FuturesUnordered, PendingOpInfo>>, >, completed_ops: Rc>>>, completed_waker: Rc, arena: FutureArena, PendingOpInfo>, } impl Drop for FuturesUnorderedDriver { fn drop(&mut self) { self.shutdown() } } impl Default for FuturesUnorderedDriver { fn default() -> Self { let (queue, results) = new_submission_queue(); let completed_ops = Rc::new(RefCell::new(VecDeque::with_capacity(128))); let completed_waker = Rc::new(UnsyncWaker::default()); let task = MaybeTask::Task(Box::pin(poll_task( results, completed_ops.clone(), completed_waker.clone(), ))) .into(); Self { len: Default::default(), task, task_set: Default::default(), completed_ops, queue, completed_waker, arena: Default::default(), } } } impl FuturesUnorderedDriver { #[inline(always)] fn ensure_task(&self) { if !self.task_set.get() { self.spawn_task(); } } #[inline(never)] #[cold] fn spawn_task(&self) { let MaybeTask::Task(task) = self.task.replace(Default::default()) else { unreachable!() }; self.task.set(MaybeTask::Handle(spawn(task))); self.task_set.set(true); } /// Spawn a polled task inside a [`FutureAllocation`], along with a function that can map it to a [`PendingOp`]. #[inline(always)] fn spawn(&self, task: FutureAllocation, PendingOpInfo>) { self.ensure_task(); self.len.set(self.len.get() + 1); self.queue.spawn(task); } } impl OpDriver for FuturesUnorderedDriver { fn submit_op_fallible< R: 'static, E: Into + 'static, const LAZY: bool, const DEFERRED: bool, >( &self, op_id: OpId, promise_id: i32, op: impl Future> + 'static, rv_map: C::MappingFn, ) -> Option> { { let info = PendingOpMappingInfo::<_, _, true>( PendingOpInfo(promise_id, op_id), rv_map, ); let mut pinned = self.arena.allocate(info, op); if LAZY { self.spawn(pinned.erase()); return None; } // We poll every future here because it's much faster to return a result than // spin the event loop to get it. match pinned.poll_unpin(&mut Context::from_waker(noop_waker_ref())) { Poll::Pending => self.spawn(pinned.erase()), Poll::Ready(res) => { if DEFERRED { drop(pinned); self.spawn(self.arena.allocate(info, ready(res)).erase()) } else { return Some(res); } } }; None } } fn submit_op_infallible< R: 'static, const LAZY: bool, const DEFERRED: bool, >( &self, op_id: OpId, promise_id: i32, op: impl Future + 'static, rv_map: C::MappingFn, ) -> Option { { let info = PendingOpMappingInfo::<_, _, false>( PendingOpInfo(promise_id, op_id), rv_map, ); let mut pinned = self.arena.allocate(info, op); if LAZY { self.spawn(pinned.erase()); return None; } // We poll every future here because it's much faster to return a result than // spin the event loop to get it. match Pin::new(&mut pinned) .poll(&mut Context::from_waker(noop_waker_ref())) { Poll::Pending => self.spawn(pinned.erase()), Poll::Ready(res) => { if DEFERRED { drop(pinned); self.spawn(self.arena.allocate(info, ready(res)).erase()) } else { return Some(res); } } }; None } } #[inline(always)] fn poll_ready( &self, cx: &mut Context, ) -> Poll<(PromiseId, OpId, OpResult)> { let mut ops = self.completed_ops.borrow_mut(); if ops.is_empty() { self.completed_waker.register(cx.waker()); return Poll::Pending; } let item = ops.pop_front().unwrap(); let PendingOp(PendingOpInfo(promise_id, op_id), resp) = item; self.len.set(self.len.get() - 1); Poll::Ready((promise_id, op_id, resp)) } #[inline(always)] fn len(&self) -> usize { self.len.get() } fn shutdown(&self) { if let MaybeTask::Handle(h) = self.task.take() { h.abort() } self.completed_ops.borrow_mut().clear(); self.queue.queue.queue.borrow_mut().clear(); } fn stats(&self, op_exclusions: &BitSet) -> OpInflightStats { let q = self.queue.queue.queue.borrow(); let mut v: Vec = Vec::with_capacity(self.len.get()); for f in q.iter() { let context = f.context(); if !op_exclusions.contains(context.1 as _) { v.push(context); } } OpInflightStats { ops: v.into_boxed_slice(), } } } impl, R> SubmissionQueueFutures for FuturesUnordered { type Future = F; type Output = F::Output; fn len(&self) -> usize { self.len() } fn spawn(&mut self, f: Self::Future) { self.push(f) } fn poll_next_unpin(&mut self, cx: &mut Context) -> Poll { Poll::Ready(ready!(Pin::new(self).poll_next(cx)).unwrap()) } } #[derive(Default)] struct Queue { queue: RefCell, item_waker: UnsyncWaker, } pub trait SubmissionQueueFutures: Default { type Future: Future; type Output; fn len(&self) -> usize; fn spawn(&mut self, f: Self::Future); fn poll_next_unpin(&mut self, cx: &mut Context) -> Poll; } pub struct SubmissionQueueResults { queue: Rc>, } impl SubmissionQueueResults { pub fn poll_next_unpin(&mut self, cx: &mut Context) -> Poll { let mut queue = self.queue.queue.borrow_mut(); self.queue.item_waker.register(cx.waker()); if queue.len() == 0 { return Poll::Pending; } queue.poll_next_unpin(cx) } } pub struct SubmissionQueue { queue: Rc>, } impl SubmissionQueue { pub fn spawn(&self, f: F::Future) { self.queue.queue.borrow_mut().spawn(f); self.queue.item_waker.wake_by_ref(); } } /// Create a [`SubmissionQueue`] and [`SubmissionQueueResults`] that allow for submission of tasks /// and reception of task results. We may add work to the [`SubmissionQueue`] from any task, and the /// [`SubmissionQueueResults`] will be polled from a single location. pub fn new_submission_queue( ) -> (SubmissionQueue, SubmissionQueueResults) { let queue: Rc> = Default::default(); ( SubmissionQueue { queue: queue.clone(), }, SubmissionQueueResults { queue }, ) }