// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. import {core, internals, primordials} from "ext:core/mod.js"; const { BadResourcePrototype, InterruptedPrototype } = core; import { op_fake_http_accept, op_fake_http_headers, op_fake_http_shutdown, op_fake_http_write, op_fake_http_write_headers, op_fake_http_write_resource } from "ext:core/ops"; const { ObjectPrototypeIsPrototypeOf, SafeSet, SafeSetIterator, SetPrototypeAdd, SetPrototypeDelete, StringPrototypeIncludes, Symbol, SymbolAsyncIterator, TypeError, TypedArrayPrototypeGetSymbolToStringTag, Uint8Array } = primordials; import {InnerBody} from "ext:deno_fetch/22_body.js"; import {BlobPrototype} from "ext:deno_web/09_file.js"; import { ResponsePrototype, toInnerResponse } from "ext:deno_fetch/23_response.js"; import { abortRequest, fromInnerRequest, newInnerRequest } from "ext:deno_fetch/23_request.js"; import { getReadableStreamResourceBacking, readableStreamClose, readableStreamForRid, ReadableStreamPrototype } from "ext:deno_web/06_streams.js"; import {SymbolDispose} from "ext:deno_web/00_infra.js"; const connErrorSymbol = Symbol("connError"); /** @type {(self: FakeHttpConn, rid: number) => boolean} */ let deleteManagedResource; class FakeHttpConn { #rid = 0; #closed = false; #remoteAddr; #localAddr; // This set holds resource ids of resources // that were created during lifecycle of this request. // When the connection is closed these resources should be closed // as well. #managedResources = new SafeSet(); static { deleteManagedResource = (self, rid) => SetPrototypeDelete(self.#managedResources, rid); } constructor(rid, remoteAddr, localAddr) { this.#rid = rid; this.#remoteAddr = remoteAddr; this.#localAddr = localAddr; } /** @returns {number} */ get rid() { return this.#rid; } /** @returns {Promise} */ async nextRequest() { let nextRequest; try { nextRequest = await op_fake_http_accept(this.#rid); } catch (error) { this.close(); this[connErrorSymbol] = error; if ( ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) || ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error) || StringPrototypeIncludes(error.message, "connection closed") ) { return null; } throw error; } if (nextRequest == null) { await null; this.close(); return null; } const {0: readStreamRid, 1: writeStreamRid, 2: method, 3: url} = nextRequest; SetPrototypeAdd(this.#managedResources, readStreamRid); SetPrototypeAdd(this.#managedResources, writeStreamRid); /** @type {ReadableStream | undefined} */ let body = null; if (method !== "GET" && method !== "HEAD") { body = readableStreamForRid(readStreamRid, false); } const innerRequest = newInnerRequest( method, url, () => op_fake_http_headers(readStreamRid), body !== null ? new InnerBody(body) : null, false ); const request = fromInnerRequest( innerRequest, "immutable", false ); const respondWith = createRespondWith( this, request, readStreamRid, writeStreamRid ); return {request, respondWith}; } /** @returns {void} */ close() { if (!this.#closed) { this.#closed = true; core.tryClose(this.#rid); for (const rid of new SafeSetIterator(this.#managedResources)) { SetPrototypeDelete(this.#managedResources, rid); core.tryClose(rid); } } } [SymbolDispose]() { core.tryClose(this.#rid); for (const rid of new SafeSetIterator(this.#managedResources)) { SetPrototypeDelete(this.#managedResources, rid); core.tryClose(rid); } } [SymbolAsyncIterator]() { // deno-lint-ignore no-this-alias const httpConn = this; return { async next() { const reqEvt = await httpConn.nextRequest(); // Change with caution, current form avoids a v8 deopt return {value: reqEvt ?? undefined, done: reqEvt === null}; } }; } } function createRespondWith( httpConn, request, readStreamRid, writeStreamRid ) { return async function respondWith(resp) { try { resp = await resp; if (!(ObjectPrototypeIsPrototypeOf(ResponsePrototype, resp))) { throw new TypeError( "First argument to respondWith must be a Response or a promise resolving to a Response." ); } const innerResp = toInnerResponse(resp); // If response body length is known, it will be sent synchronously in a // single op, in other case a "response body" resource will be created and // we'll be streaming it. /** @type {ReadableStream | Uint8Array | null} */ let respBody = null; if (innerResp.body !== null) { if (innerResp.body.unusable()) { throw new TypeError("Body is unusable."); } if ( ObjectPrototypeIsPrototypeOf( ReadableStreamPrototype, innerResp.body.streamOrStatic ) ) { if ( innerResp.body.length === null || ObjectPrototypeIsPrototypeOf( BlobPrototype, innerResp.body.source ) ) { respBody = innerResp.body.stream; } else { const reader = innerResp.body.stream.getReader(); const r1 = await reader.read(); if (r1.done) { respBody = new Uint8Array(0); } else { respBody = r1.value; const r2 = await reader.read(); if (!r2.done) throw new TypeError("Unreachable"); } } } else { innerResp.body.streamOrStatic.consumed = true; respBody = innerResp.body.streamOrStatic.body; } } else { respBody = new Uint8Array(0); } const isStreamingResponseBody = !( typeof respBody === "string" || TypedArrayPrototypeGetSymbolToStringTag(respBody) === "Uint8Array" ); try { await op_fake_http_write_headers( writeStreamRid, innerResp.status ?? 200, innerResp.headerList, isStreamingResponseBody ? null : respBody ); } catch (error) { const connError = httpConn[connErrorSymbol]; if ( ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && connError != null ) { // deno-lint-ignore no-ex-assign error = new connError.constructor(connError.message); } if ( respBody !== null && ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody) ) { await respBody.cancel(error); } throw error; } if (isStreamingResponseBody) { let success = false; if ( respBody === null || !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody) ) { throw new TypeError("Unreachable"); } const resourceBacking = getReadableStreamResourceBacking(respBody); let reader; if (resourceBacking) { if (respBody.locked) { throw new TypeError("ReadableStream is locked."); } reader = respBody.getReader(); // Acquire JS lock. try { await op_fake_http_write_resource( writeStreamRid, resourceBacking.rid ); if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); readableStreamClose(respBody); // Release JS lock. success = true; } catch (error) { const connError = httpConn[connErrorSymbol]; if ( ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && connError != null ) { // deno-lint-ignore no-ex-assign error = new connError.constructor(connError.message); } await reader.cancel(error); throw error; } } else { reader = respBody.getReader(); while (true) { const {value, done} = await reader.read(); if (done) break; if ( TypedArrayPrototypeGetSymbolToStringTag(value) !== "Uint8Array" ) { await reader.cancel(new TypeError("Value not a Uint8Array")); break; } try { await op_fake_http_write(writeStreamRid, value); } catch (error) { const connError = httpConn[connErrorSymbol]; if ( ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && connError != null ) { // deno-lint-ignore no-ex-assign error = new connError.constructor(connError.message); } await reader.cancel(error); throw error; } } success = true; } if (success) { try { await op_fake_http_shutdown(writeStreamRid); } catch (error) { await reader.cancel(error); throw error; } } } } catch (error) { abortRequest(request); throw error; } finally { if (deleteManagedResource(httpConn, readStreamRid)) { core.tryClose(readStreamRid); } if (deleteManagedResource(httpConn, writeStreamRid)) { core.tryClose(writeStreamRid); } } }; } export {FakeHttpConn};