{ private bool $ingoing = false; private vec $pending = vec[]; private vec $waits = vec[]; public function __construct( private readonly Closure<(I), O> $operation, ) { } /** * Run the operation using the given `$input`, after all previous operations have completed. */ public function waitFor(I $input): O { if $this->ingoing { $suspension = EventLoop::getSuspension::(); $this->pending[] = $suspension; $suspension->suspend(); } $this->ingoing = true; try { ($this->operation)($input) } finally { $suspension = $this->pending[0] ?? null; if $suspension !== null { $this->pending = array_slice($this->pending, 1); $suspension->resume(); } else { foreach $this->waits as $suspension { $suspension->resume(); } $this->waits = vec[]; $this->ingoing = false; } } } /** * Cancel all pending operations. * * Any pending operation will fail with the given exception. * * Future operations will continue execution as usual. */ public function cancel(Exception $exception): void { $suspensions = $this->pending; $this->pending = vec[]; foreach $suspensions as $suspension { $suspension->throw($exception); } } /** * Get the number of operations pending execution. */ public function getPendingOperations(): int { count($this->pending) } /** * Check if there's any operations pending execution. * * If this method returns `true`, it means future calls to `waitFor` will wait. */ public function hasPendingOperations(): bool { $this->pending !== vec[] } /** * Check if the sequence has any ingoing operations. * * If this method returns `true`, it means future calls to `waitFor` will wait. * If this method returns `false`, it means future calls to `waitFor` will execute immediately. */ public function hasIngoingOperations(): bool { $this->ingoing } /** * Wait for all pending operations to finish execution. */ public function waitForPending(): void { if !$this->ingoing { return; } $suspension = EventLoop::getSuspension::(); $this->waits[] = $suspension; $suspension->suspend(); } }