diff --git a/bench/sqlite/better-sqlite3.mjs b/bench/sqlite/better-sqlite3.mjs new file mode 100644 index 00000000000000..9bf25105b908e1 --- /dev/null +++ b/bench/sqlite/better-sqlite3.mjs @@ -0,0 +1,31 @@ +import { run, bench } from "mitata"; +import { createRequire } from "module"; + +const require = createRequire(import.meta.url); +const db = require("better-sqlite3")("./src/northwind.sqlite"); + +{ + const sql = db.prepare(`SELECT * FROM "Order"`); + + bench('SELECT * FROM "Order"', () => { + sql.all(); + }); +} + +{ + const sql = db.prepare(`SELECT * FROM "Product"`); + + bench('SELECT * FROM "Product"', () => { + sql.all(); + }); +} + +{ + const sql = db.prepare(`SELECT * FROM "OrderDetail"`); + + bench('SELECT * FROM "OrderDetail"', () => { + sql.all(); + }); +} + +await run(); diff --git a/bench/sqlite/node.mjs b/bench/sqlite/node.mjs index 9bf25105b908e1..7602a87612d252 100644 --- a/bench/sqlite/node.mjs +++ b/bench/sqlite/node.mjs @@ -1,8 +1,9 @@ +// Run `node --experimental-sqlite bench/sqlite/node.mjs` to run the script. +// You will need `--experimental-sqlite` flag to run this script and node v22.5.0 or higher. import { run, bench } from "mitata"; -import { createRequire } from "module"; +import { DatabaseSync as Database } from "node:sqlite"; -const require = createRequire(import.meta.url); -const db = require("better-sqlite3")("./src/northwind.sqlite"); +const db = new Database("./src/northwind.sqlite"); { const sql = db.prepare(`SELECT * FROM "Order"`); diff --git a/src/js/node/http.ts b/src/js/node/http.ts index 6a7b821428531b..45c8efefb8cd1b 100644 --- a/src/js/node/http.ts +++ b/src/js/node/http.ts @@ -23,8 +23,12 @@ const { headersTuple: any; }; +// TODO: make this more robust. +function isAbortError(err) { + return err?.name === "AbortError"; +} + const ObjectDefineProperty = Object.defineProperty; -const ObjectSetPrototypeOf = Object.setPrototypeOf; const GlobalPromise = globalThis.Promise; const headerCharRegex = /[^\t\x20-\x7e\x80-\xff]/; @@ -288,7 +292,7 @@ function Agent(options = kEmptyObject) { this.protocol = options.protocol || "http:"; } Agent.prototype = {}; -ObjectSetPrototypeOf(Agent.prototype, EventEmitter.prototype); +$setPrototypeDirect.$call(Agent.prototype, EventEmitter.prototype); ObjectDefineProperty(Agent, "globalAgent", { get: function () { @@ -345,21 +349,11 @@ Agent.prototype.destroy = function () { $debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.destroy is a no-op"); }; -function emitListeningNextTick(self, onListen, err, hostname, port) { - if (typeof onListen === "function") { - try { - onListen.$apply(self, [err, hostname, port]); - } catch (err) { - self.emit("error", err); - } - } - - self.listening = !err; - - if (err) { - self.emit("error", err); - } else { - self.emit("listening", hostname, port); +function emitListeningNextTick(self, hostname, port) { + if ((self.listening = !!self[serverSymbol])) { + // TODO: remove the arguments + // Note does not pass any arguments. + self.emit("listening", null, hostname, port); } } @@ -445,203 +439,220 @@ function Server(options, callback) { if (callback) this.on("request", callback); return this; } -Object.setPrototypeOf((Server.prototype = {}), EventEmitter.prototype); -Server.prototype.constructor = Server; // Re-add constructor which got lost when setting prototype -Object.setPrototypeOf(Server, EventEmitter); -Server.prototype.ref = function () { - this._unref = false; - this[serverSymbol]?.ref?.(); - return this; -}; +Server.prototype = { + ref() { + this._unref = false; + this[serverSymbol]?.ref?.(); + return this; + }, -Server.prototype.unref = function () { - this._unref = true; - this[serverSymbol]?.unref?.(); - return this; -}; + unref() { + this._unref = true; + this[serverSymbol]?.unref?.(); + return this; + }, -Server.prototype.closeAllConnections = function () { - const server = this[serverSymbol]; - if (!server) { - return; - } - this[serverSymbol] = undefined; - server.stop(true); - this.emit("close"); -}; + closeAllConnections() { + const server = this[serverSymbol]; + if (!server) { + return; + } + this[serverSymbol] = undefined; + server.stop(true); + process.nextTick(emitCloseNT, this); + }, -Server.prototype.closeIdleConnections = function () { - // not actually implemented -}; + closeIdleConnections() { + // not actually implemented + }, -Server.prototype.close = function (optionalCallback?) { - const server = this[serverSymbol]; - if (!server) { - if (typeof optionalCallback === "function") process.nextTick(optionalCallback, new Error("Server is not running")); - return; - } - this[serverSymbol] = undefined; - if (typeof optionalCallback === "function") this.once("close", optionalCallback); - server.stop(); - this.emit("close"); -}; + close(optionalCallback?) { + const server = this[serverSymbol]; + if (!server) { + if (typeof optionalCallback === "function") + process.nextTick(optionalCallback, new Error("Server is not running")); + return; + } + this[serverSymbol] = undefined; + if (typeof optionalCallback === "function") this.once("close", optionalCallback); + server.stop(); + process.nextTick(emitCloseNT, this); + }, -Server.prototype[Symbol.asyncDispose] = function () { - const { resolve, reject, promise } = Promise.withResolvers(); - this.close(function (err, ...args) { - if (err) reject(err); - else resolve(...args); - }); - return promise; -}; + [Symbol.asyncDispose]() { + const { resolve, reject, promise } = Promise.withResolvers(); + this.close(function (err, ...args) { + if (err) reject(err); + else resolve(...args); + }); + return promise; + }, -Server.prototype.address = function () { - if (!this[serverSymbol]) return null; - return this[serverSymbol].address; -}; + address() { + if (!this[serverSymbol]) return null; + return this[serverSymbol].address; + }, -Server.prototype.listen = function (port, host, backlog, onListen) { - const server = this; - let socketPath; - if (typeof port === "undefined") { - port = 0; - } - if (typeof port == "string" && !Number.isSafeInteger(Number(port))) { - socketPath = port; - } - if (typeof host === "function") { - onListen = host; - host = undefined; - } + listen() { + const server = this; + let port, host, onListen; + let socketPath; + let tls = this[tlsSymbol]; + + // This logic must align with: + // - https://github.com/nodejs/node/blob/2eff28fb7a93d3f672f80b582f664a7c701569fb/lib/net.js#L274-L307 + if (arguments.length > 0) { + if (($isObject(arguments[0]) || $isCallable(arguments[0])) && arguments[0] !== null) { + // (options[...][, cb]) + port = arguments[0].port; + host = arguments[0].host; + socketPath = arguments[0].path; + + const otherTLS = arguments[0].tls; + if (otherTLS && $isObject(otherTLS)) { + tls = otherTLS; + } + } else if (typeof arguments[0] === "string" && !(Number(arguments[0]) >= 0)) { + // (path[...][, cb]) + socketPath = arguments[0]; + } else { + // ([port][, host][...][, cb]) + port = arguments[0]; + if (arguments.length > 1 && typeof arguments[1] === "string") { + host = arguments[1]; + } + } + } - if (typeof port === "function") { - onListen = port; - } else if (typeof port === "object") { - port?.signal?.addEventListener("abort", () => { - this.close(); - }); + // Bun defaults to port 3000. + // Node defaults to port 0. + if (port === undefined && !socketPath) { + port = 0; + } - host = port?.host; - port = port?.port; + if ($isCallable(arguments[arguments.length - 1])) { + onListen = arguments[arguments.length - 1]; + } - if (typeof port?.callback === "function") onListen = port?.callback; - } + const ResponseClass = this[optionsSymbol].ServerResponse || ServerResponse; + const RequestClass = this[optionsSymbol].IncomingMessage || IncomingMessage; + let isHTTPS = false; - if (typeof backlog === "function") { - onListen = backlog; - } + try { + if (tls) { + this.serverName = tls.serverName || host || "localhost"; + } + this[serverSymbol] = Bun.serve({ + tls, + port, + hostname: host, + unix: socketPath, + // Bindings to be used for WS Server + websocket: { + open(ws) { + ws.data.open(ws); + }, + message(ws, message) { + ws.data.message(ws, message); + }, + close(ws, code, reason) { + ws.data.close(ws, code, reason); + }, + drain(ws) { + ws.data.drain(ws); + }, + ping(ws, data) { + ws.data.ping(ws, data); + }, + pong(ws, data) { + ws.data.pong(ws, data); + }, + }, + maxRequestBodySize: Number.MAX_SAFE_INTEGER, + // Be very careful not to access (web) Request object + // properties: + // - request.url + // - request.headers + // + // We want to avoid triggering the getter for these properties because + // that will cause the data to be cloned twice, which costs memory & performance. + fetch(req, _server) { + var pendingResponse; + var pendingError; + var reject = err => { + if (pendingError) return; + pendingError = err; + if (rejectFunction) rejectFunction(err); + }; + + var reply = function (resp) { + if (pendingResponse) return; + pendingResponse = resp; + if (resolveFunction) resolveFunction(resp); + }; + + const prevIsNextIncomingMessageHTTPS = isNextIncomingMessageHTTPS; + isNextIncomingMessageHTTPS = isHTTPS; + const http_req = new RequestClass(req); + isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS; + + const upgrade = http_req.headers.upgrade; + + const http_res = new ResponseClass(http_req, reply); + + http_req.socket[kInternalSocketData] = [_server, http_res, req]; + server.emit("connection", http_req.socket); + + const rejectFn = err => reject(err); + http_req.once("error", rejectFn); + http_res.once("error", rejectFn); + + if (upgrade) { + server.emit("upgrade", http_req, http_req.socket, kEmptyBuffer); + } else { + server.emit("request", http_req, http_res); + } - const ResponseClass = this[optionsSymbol].ServerResponse || ServerResponse; - const RequestClass = this[optionsSymbol].IncomingMessage || IncomingMessage; - let isHTTPS = false; + if (pendingError) { + throw pendingError; + } - try { - const tls = this[tlsSymbol]; - if (tls) { - this.serverName = tls.serverName || host || "localhost"; - } - this[serverSymbol] = Bun.serve({ - tls, - port, - hostname: host, - unix: socketPath, - // Bindings to be used for WS Server - websocket: { - open(ws) { - ws.data.open(ws); - }, - message(ws, message) { - ws.data.message(ws, message); - }, - close(ws, code, reason) { - ws.data.close(ws, code, reason); - }, - drain(ws) { - ws.data.drain(ws); - }, - ping(ws, data) { - ws.data.ping(ws, data); - }, - pong(ws, data) { - ws.data.pong(ws, data); - }, - }, - maxRequestBodySize: Number.MAX_SAFE_INTEGER, - // Be very careful not to access (web) Request object - // properties: - // - request.url - // - request.headers - // - // We want to avoid triggering the getter for these properties because - // that will cause the data to be cloned twice, which costs memory & performance. - fetch(req, _server) { - var pendingResponse; - var pendingError; - var reject = err => { - if (pendingError) return; - pendingError = err; - if (rejectFunction) rejectFunction(err); - }; - - var reply = function (resp) { - if (pendingResponse) return; - pendingResponse = resp; - if (resolveFunction) resolveFunction(resp); - }; - - const prevIsNextIncomingMessageHTTPS = isNextIncomingMessageHTTPS; - isNextIncomingMessageHTTPS = isHTTPS; - const http_req = new RequestClass(req); - isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS; - - const upgrade = http_req.headers.upgrade; - - const http_res = new ResponseClass(http_req, reply); - - http_req.socket[kInternalSocketData] = [_server, http_res, req]; - server.emit("connection", http_req.socket); - - const rejectFn = err => reject(err); - http_req.once("error", rejectFn); - http_res.once("error", rejectFn); - - if (upgrade) { - server.emit("upgrade", http_req, http_req.socket, kEmptyBuffer); - } else { - server.emit("request", http_req, http_res); - } + if (pendingResponse) { + return pendingResponse; + } - if (pendingError) { - throw pendingError; - } + var { promise, resolve: resolveFunction, reject: rejectFunction } = $newPromiseCapability(GlobalPromise); + return promise; + }, + }); + isHTTPS = this[serverSymbol].protocol === "https"; - if (pendingResponse) { - return pendingResponse; - } + if (this?._unref) { + this[serverSymbol]?.unref?.(); + } - var { promise, resolve: resolveFunction, reject: rejectFunction } = $newPromiseCapability(GlobalPromise); - return promise; - }, - }); - isHTTPS = this[serverSymbol].protocol === "https"; + if ($isCallable(onListen)) { + this.once("listening", onListen); + } - if (this?._unref) { - this[serverSymbol]?.unref?.(); + setTimeout(emitListeningNextTick, 1, this, this[serverSymbol].hostname, this[serverSymbol].port); + } catch (err) { + server.emit("error", err); } - setTimeout(emitListeningNextTick, 1, this, onListen, null, this[serverSymbol].hostname, this[serverSymbol].port); - } catch (err) { - server.emit("error", err); - } + return this; + }, - return this; -}; + setTimeout(msecs, callback) { + // TODO: + return this; + }, -Server.prototype.setTimeout = function (msecs, callback) { - // TODO: - return this; + constructor: Server, }; +$setPrototypeDirect.$call(Server.prototype, EventEmitter.prototype); +$setPrototypeDirect.$call(Server, EventEmitter); function assignHeadersSlow(object, req) { const headers = req.headers; @@ -758,133 +769,172 @@ function IncomingMessage(req, defaultIncomingOpts) { this.complete = !!this[noBodySymbol]; } -Object.setPrototypeOf((IncomingMessage.prototype = {}), Readable.prototype); -IncomingMessage.prototype.constructor = IncomingMessage; // Re-add constructor which got lost when setting prototype -Object.setPrototypeOf(IncomingMessage, Readable); +IncomingMessage.prototype = { + constructor: IncomingMessage, + _construct(callback) { + // TODO: streaming + if (this[typeSymbol] === "response" || this[noBodySymbol]) { + callback(); + return; + } -IncomingMessage.prototype._construct = function (callback) { - // TODO: streaming - if (this[typeSymbol] === "response" || this[noBodySymbol]) { - callback(); - return; - } + const contentLength = this.headers["content-length"]; + const length = contentLength ? parseInt(contentLength, 10) : 0; + if (length === 0) { + this[noBodySymbol] = true; + callback(); + return; + } - const contentLength = this.headers["content-length"]; - const length = contentLength ? parseInt(contentLength, 10) : 0; - if (length === 0) { - this[noBodySymbol] = true; callback(); - return; - } - - callback(); -}; + }, + _read(size) { + if (this[noBodySymbol]) { + this.complete = true; + this.push(null); + } else if (this[bodyStreamSymbol] == null) { + const reader = this[reqSymbol].body?.getReader() as ReadableStreamDefaultReader; + if (!reader) { + this.complete = true; + this.push(null); + return; + } + this[bodyStreamSymbol] = reader; + consumeStream(this, reader); + } + }, + _destroy(err, cb) { + if (!this.readableEnded || !this.complete) { + this[abortedSymbol] = true; + // IncomingMessage emits 'aborted'. + // Client emits 'abort'. + this.emit("aborted"); + } -async function consumeStream(self, reader: ReadableStreamDefaultReader) { - while (true) { - var { done, value } = await reader.readMany(); - if (self[abortedSymbol]) return; - if (done) { - self.complete = true; - self.push(null); - break; + // Suppress "AbortError" from fetch() because we emit this in the 'aborted' event + if (isAbortError(err)) { + err = undefined; } - for (var v of value) { - self.push(v); + + const stream = this[bodyStreamSymbol]; + this[bodyStreamSymbol] = undefined; + const streamState = stream?.$state; + + if (streamState === $streamReadable || streamState === $streamWaiting || streamState === $streamWritable) { + stream?.cancel?.().catch(nop); } - } -} -IncomingMessage.prototype._read = function (size) { - if (this[noBodySymbol]) { - this.complete = true; - this.push(null); - } else if (this[bodyStreamSymbol] == null) { - const reader = this[reqSymbol].body?.getReader() as ReadableStreamDefaultReader; - if (!reader) { - this.complete = true; - this.push(null); - return; + const socket = this[fakeSocketSymbol]; + if (socket) { + socket.destroy(err); } - this[bodyStreamSymbol] = reader; - consumeStream(this, reader); - } -}; -Object.defineProperty(IncomingMessage.prototype, "aborted", { - get() { + if (cb) { + emitErrorNextTick(this, err, cb); + } + }, + get aborted() { return this[abortedSymbol]; }, -}); - -Object.defineProperty(IncomingMessage.prototype, "connection", { - get() { + set aborted(value) { + this[abortedSymbol] = value; + }, + get connection() { return (this[fakeSocketSymbol] ??= new FakeSocket()); }, -}); - -Object.defineProperty(IncomingMessage.prototype, "statusCode", { - get() { + get statusCode() { return this[reqSymbol].status; }, - set(v) { - if (!(v in STATUS_CODES)) return; - this[reqSymbol].status = v; + set statusCode(value) { + if (!(value in STATUS_CODES)) return; + this[reqSymbol].status = value; }, -}); - -Object.defineProperty(IncomingMessage.prototype, "statusMessage", { - get() { + get statusMessage() { return STATUS_CODES[this[reqSymbol].status]; }, - set(v) { - //noop + set statusMessage(value) { + // noop }, -}); - -Object.defineProperty(IncomingMessage.prototype, "httpVersion", { - get() { + get httpVersion() { return "1.1"; }, -}); - -Object.defineProperty(IncomingMessage.prototype, "rawTrailers", { - get() { - return []; + set httpVersion(value) { + // noop }, -}); - -Object.defineProperty(IncomingMessage.prototype, "httpVersionMajor", { - get() { + get httpVersionMajor() { return 1; }, -}); - -Object.defineProperty(IncomingMessage.prototype, "httpVersionMinor", { - get() { + set httpVersionMajor(value) { + // noop + }, + get httpVersionMinor() { return 1; }, -}); - -Object.defineProperty(IncomingMessage.prototype, "trailers", { - get() { + set httpVersionMinor(value) { + // noop + }, + get rawTrailers() { + return []; + }, + set rawTrailers(value) { + // noop + }, + get trailers() { return kEmptyObject; }, -}); - -Object.defineProperty(IncomingMessage.prototype, "socket", { - get() { + set trailers(value) { + // noop + }, + setTimeout(msecs, callback) { + // noop + return this; + }, + get socket() { return (this[fakeSocketSymbol] ??= new FakeSocket()); }, - set(val) { - this[fakeSocketSymbol] = val; + set socket(value) { + this[fakeSocketSymbol] = value; }, -}); - -IncomingMessage.prototype.setTimeout = function (msecs, callback) { - // TODO: - return this; }; +$setPrototypeDirect.$call(IncomingMessage.prototype, Readable.prototype); +$setPrototypeDirect.$call(IncomingMessage, Readable); + +async function consumeStream(self, reader: ReadableStreamDefaultReader) { + var done = false, + value, + aborted = false; + try { + while (true) { + const result = reader.readMany(); + if ($isPromise(result)) { + ({ done, value } = await result); + } else { + ({ done, value } = result); + } + + if (self.destroyed || (aborted = self[abortedSymbol])) { + break; + } + for (var v of value) { + self.push(v); + } + + if (self.destroyed || (aborted = self[abortedSymbol]) || done) { + break; + } + } + } catch (err) { + if (aborted || self.destroyed) return; + self.destroy(err); + } finally { + reader?.cancel?.().catch?.(nop); + } + + if (!self.complete) { + self.complete = true; + self.push(null); + } +} const headersSymbol = Symbol("headers"); const finishedSymbol = Symbol("finished"); @@ -899,9 +949,9 @@ function OutgoingMessage(options) { this[kAbortController] = null; } -Object.setPrototypeOf((OutgoingMessage.prototype = {}), Writable.prototype); +$setPrototypeDirect.$call((OutgoingMessage.prototype = {}), Writable.prototype); OutgoingMessage.prototype.constructor = OutgoingMessage; // Re-add constructor which got lost when setting prototype -Object.setPrototypeOf(OutgoingMessage, Writable); +$setPrototypeDirect.$call(OutgoingMessage, Writable); // Express "compress" package uses this OutgoingMessage.prototype._implicitHeader = function () {}; @@ -1103,9 +1153,9 @@ function ServerResponse(req, reply) { // https://github.com/nodejs/node/blob/cf8c6994e0f764af02da4fa70bc5962142181bf3/lib/_http_server.js#L192 if (req.method === "HEAD") this._hasBody = false; } -Object.setPrototypeOf((ServerResponse.prototype = {}), OutgoingMessage.prototype); +$setPrototypeDirect.$call((ServerResponse.prototype = {}), OutgoingMessage.prototype); ServerResponse.prototype.constructor = ServerResponse; // Re-add constructor which got lost when setting prototype -Object.setPrototypeOf(ServerResponse, OutgoingMessage); +$setPrototypeDirect.$call(ServerResponse, OutgoingMessage); // Express "compress" package uses this ServerResponse.prototype._implicitHeader = function () { @@ -1396,10 +1446,7 @@ class ClientRequest extends OutgoingMessage { this.destroyed = true; // If request is destroyed we abort the current response this[kAbortController]?.abort?.(); - if (err) { - this.emit("error", err); - } - callback(); + emitErrorNextTick(this, err, callback); } _ensureTls() { @@ -1410,11 +1457,16 @@ class ClientRequest extends OutgoingMessage { _final(callback) { this.#finished = true; this[kAbortController] = new AbortController(); - this[kAbortController].signal.addEventListener("abort", () => { - this.emit("abort"); - this[kClearTimeout](); - this.destroy(); - }); + this[kAbortController].signal.addEventListener( + "abort", + () => { + this[kClearTimeout]?.(); + if (this.destroyed) return; + this.emit("abort"); + this.destroy(); + }, + { once: true }, + ); if (this.#signal?.aborted) { this[kAbortController].abort(); } @@ -1471,6 +1523,10 @@ class ClientRequest extends OutgoingMessage { //@ts-ignore this.#fetchRequest = fetch(url, fetchOptions) .then(response => { + if (this.aborted) { + return; + } + const prevIsHTTPS = isNextIncomingMessageHTTPS; isNextIncomingMessageHTTPS = response.url.startsWith("https:"); var res = (this.#res = new IncomingMessage(response, { @@ -1483,7 +1539,7 @@ class ClientRequest extends OutgoingMessage { .catch(err => { // Node treats AbortError separately. // The "abort" listener on the abort controller should have called this - if (err?.name === "AbortError") { + if (isAbortError(err)) { return; } @@ -1505,13 +1561,19 @@ class ClientRequest extends OutgoingMessage { } get aborted() { - return this.#signal?.aborted || !!this[kAbortController]?.signal.aborted; + return this[abortedSymbol] || this.#signal?.aborted || !!this[kAbortController]?.signal.aborted; + } + + set aborted(value) { + this[abortedSymbol] = value; } abort() { if (this.aborted) return; + this[abortedSymbol] = true; + process.nextTick(emitAbortNextTick, this); this[kAbortController]?.abort?.(); - // TODO: Close stream if body streaming + this.destroy(); } constructor(input, options, cb) { @@ -2118,6 +2180,22 @@ function get(url, options, cb) { return req; } +function onError(self, error, cb) { + if (error) { + cb(error); + } else { + cb(); + } +} + +function emitErrorNextTick(self, err, cb) { + process.nextTick(onError, self, err, cb); +} + +function emitAbortNextTick(self) { + self.emit("abort"); +} + var globalAgent = new Agent(); export default { Agent, diff --git a/test/js/first_party/undici/undici-primordials.test.ts b/test/js/first_party/undici/undici-primordials.test.ts index dad454a42d3b75..63c57cf086a0ce 100644 --- a/test/js/first_party/undici/undici-primordials.test.ts +++ b/test/js/first_party/undici/undici-primordials.test.ts @@ -1,7 +1,17 @@ -import { describe, it, expect, beforeAll, afterAll } from "bun:test"; +import { describe, it, expect, beforeAll, afterAll, afterEach } from "bun:test"; +const { Response, Request, Headers, FormData, File, URL, AbortSignal, URLSearchParams } = globalThis; +afterEach(() => { + globalThis.Response = Response; + globalThis.Request = Request; + globalThis.Headers = Headers; + globalThis.FormData = FormData; + globalThis.File = File; + globalThis.URL = URL; + globalThis.AbortSignal = AbortSignal; + globalThis.URLSearchParams = URLSearchParams; +}); it("undici", () => { - const { Response, Request, Headers, FormData, File, URL, AbortSignal, URLSearchParams } = globalThis; globalThis.Response = globalThis.Request = globalThis.Headers = diff --git a/test/js/node/http/node-fetch-primordials.test.ts b/test/js/node/http/node-fetch-primordials.test.ts index de674023c37a36..2fdb93eca92f27 100644 --- a/test/js/node/http/node-fetch-primordials.test.ts +++ b/test/js/node/http/node-fetch-primordials.test.ts @@ -1,4 +1,14 @@ -import { test, expect } from "bun:test"; +import { afterEach, expect, test } from "bun:test"; + +const originalResponse = globalThis.Response; +const originalRequest = globalThis.Request; +const originalHeaders = globalThis.Headers; +afterEach(() => { + globalThis.Response = originalResponse; + globalThis.Request = originalRequest; + globalThis.Headers = originalHeaders; + globalThis.fetch = Bun.fetch; +}); test("fetch, Response, Request can be overriden", async () => { const { Response, Request } = globalThis; diff --git a/test/js/node/http/node-fetch.test.js b/test/js/node/http/node-fetch.test.js index 92a3f12e3925ae..a865d0b1575252 100644 --- a/test/js/node/http/node-fetch.test.js +++ b/test/js/node/http/node-fetch.test.js @@ -3,7 +3,17 @@ import * as iso from "isomorphic-fetch"; import * as vercelFetch from "@vercel/fetch"; import * as stream from "stream"; -import { test, expect } from "bun:test"; +import { test, expect, beforeAll, afterAll, afterEach } from "bun:test"; + +const originalResponse = globalThis.Response; +const originalRequest = globalThis.Request; +const originalHeaders = globalThis.Headers; +afterEach(() => { + globalThis.Response = originalResponse; + globalThis.Request = originalRequest; + globalThis.Headers = originalHeaders; + globalThis.fetch = Bun.fetch; +}); test("node-fetch", () => { expect(Response.prototype).toBeInstanceOf(globalThis.Response); diff --git a/test/js/node/http/node-http-error-in-data-handler-fixture.1.js b/test/js/node/http/node-http-error-in-data-handler-fixture.1.js new file mode 100644 index 00000000000000..b33d56f40f67b2 --- /dev/null +++ b/test/js/node/http/node-http-error-in-data-handler-fixture.1.js @@ -0,0 +1,35 @@ +const http = require("http"); +const server = http.createServer((req, res) => { + res.end("Hello World\n"); +}); +const { promise, resolve, reject } = Promise.withResolvers(); +process.exitCode = 1; + +server.listen(0, function () { + const port = server.address().port; + http + .request(`http://localhost:${port}`, res => { + res + .on("data", data => { + // base64 the message to ensure we don't confuse source code with the error message + throw new Error(Buffer.from("VGVzdCBwYXNzZWQ=", "base64")); + }) + .on("end", () => { + server.close(); + }); + }) + .on("error", reject) + .end(); +}); + +server.on("close", () => { + resolve(); +}); +server.on("error", err => { + reject(err); +}); + +process.on("uncaughtException", err => { + console.log(err); + process.exit(0); +}); diff --git a/test/js/node/http/node-http-error-in-data-handler-fixture.2.js b/test/js/node/http/node-http-error-in-data-handler-fixture.2.js new file mode 100644 index 00000000000000..7fb81dc9f2f7af --- /dev/null +++ b/test/js/node/http/node-http-error-in-data-handler-fixture.2.js @@ -0,0 +1,36 @@ +const http = require("http"); +const server = http.createServer(async (req, res) => { + res.end("Hello World\n"); +}); +const { promise, resolve, reject } = Promise.withResolvers(); +process.exitCode = 1; + +server.listen(0, function () { + const port = server.address().port; + http + .request(`http://localhost:${port}`, res => { + res + .on("data", async data => { + await Bun.sleep(1); + // base64 the message to ensure we don't confuse source code with the error message + throw new Error(Buffer.from("VGVzdCBwYXNzZWQ=", "base64")); + }) + .on("end", () => { + server.close(); + }); + }) + .on("error", reject) + .end(); +}); + +server.on("close", () => { + resolve(); +}); +server.on("error", err => { + reject(err); +}); + +process.on("unhandledRejection", err => { + console.log(err); + process.exit(0); +}); diff --git a/test/js/node/http/node-http-primoridals.test.ts b/test/js/node/http/node-http-primoridals.test.ts index 57b1a8506b50fc..00760801358dc1 100644 --- a/test/js/node/http/node-http-primoridals.test.ts +++ b/test/js/node/http/node-http-primoridals.test.ts @@ -1,4 +1,16 @@ -import { test, expect } from "bun:test"; +import { test, expect, afterEach } from "bun:test"; + +const Response = globalThis.Response; +const Request = globalThis.Request; +const Headers = globalThis.Headers; +const Blob = globalThis.Blob; + +afterEach(() => { + globalThis.Response = Response; + globalThis.Request = Request; + globalThis.Headers = Headers; + globalThis.Blob = Blob; +}); // This test passes by not hanging. test("Overriding Request, Response, Headers, and Blob should not break node:http server", async () => { diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index cec90d9e853362..883f30ba7b29a9 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -1943,7 +1943,7 @@ it("should emit events in the right order", async () => { it("destroy should end download", async () => { // just simulate some file that will take forever to download - const payload = Buffer.from("X".repeat(16 * 1024)); + const payload = Buffer.from("X".repeat(128 * 1024)); using server = Bun.serve({ port: 0, @@ -1958,24 +1958,33 @@ it("destroy should end download", async () => { }); }, }); - { - let chunks = 0; - const { promise, resolve } = Promise.withResolvers(); + async function run() { + let receivedByteLength = 0; + let { promise, resolve } = Promise.withResolvers(); const req = request(server.url, res => { - res.on("data", () => { - process.nextTick(resolve); - chunks++; + res.on("data", data => { + receivedByteLength += data.length; + if (resolve) { + resolve(); + resolve = null; + } }); }); req.end(); - // wait for the first chunk await promise; - // should stop the download req.destroy(); - await Bun.sleep(200); - expect(chunks).toBeLessThanOrEqual(3); + await Bun.sleep(10); + const initialByteLength = receivedByteLength; + expect(receivedByteLength).toBeLessThanOrEqual(payload.length * 3); + await Bun.sleep(10); + expect(initialByteLength).toBe(receivedByteLength); + await Bun.sleep(10); } + + const runCount = 50; + const runs = Array.from({ length: runCount }, run); + await Promise.all(runs); }); it("can send brotli from Server and receive with fetch", async () => { @@ -2219,3 +2228,27 @@ it("should mark complete true", async () => { server.close(); } }); + +it("should propagate exception in sync data handler", async () => { + const { exitCode, stdout } = Bun.spawnSync({ + cmd: [bunExe(), "run", path.join(import.meta.dir, "node-http-error-in-data-handler-fixture.1.js")], + stdout: "pipe", + stderr: "inherit", + env: bunEnv, + }); + + expect(stdout.toString()).toContain("Test passed"); + expect(exitCode).toBe(0); +}); + +it("should propagate exception in async data handler", async () => { + const { exitCode, stdout } = Bun.spawnSync({ + cmd: [bunExe(), "run", path.join(import.meta.dir, "node-http-error-in-data-handler-fixture.2.js")], + stdout: "pipe", + stderr: "inherit", + env: bunEnv, + }); + + expect(stdout.toString()).toContain("Test passed"); + expect(exitCode).toBe(0); +});