From c9487138d6d8fd39c8c8512239b6724cf2b275ff Mon Sep 17 00:00:00 2001 From: pilcrowOnPaper <80624252+pilcrowOnPaper@users.noreply.github.com> Date: Thu, 16 Nov 2023 08:39:41 +0900 Subject: [PATCH] Cancel response stream when connection closes (#9071) * cancel stream on close * add changeset * add test * Update .changeset/modern-ways-develop.md Co-authored-by: Sarah Rainsberger --------- Co-authored-by: lilnasy <69170106+lilnasy@users.noreply.github.com> Co-authored-by: Sarah Rainsberger --- .changeset/modern-ways-develop.md | 5 + .../integrations/node/src/nodeMiddleware.ts | 12 +- .../node/src/response-iterator.ts | 228 ------------------ .../integrations/node/test/api-route.test.js | 19 ++ .../test/fixtures/api-route/src/pages/hash.ts | 2 +- .../fixtures/api-route/src/pages/streaming.ts | 22 ++ 6 files changed, 55 insertions(+), 233 deletions(-) create mode 100644 .changeset/modern-ways-develop.md delete mode 100644 packages/integrations/node/src/response-iterator.ts create mode 100644 packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts diff --git a/.changeset/modern-ways-develop.md b/.changeset/modern-ways-develop.md new file mode 100644 index 000000000000..0378abc1aeec --- /dev/null +++ b/.changeset/modern-ways-develop.md @@ -0,0 +1,5 @@ +--- +'@astrojs/node': patch +--- + +Fixes a bug where the response stream would not cancel when the connection closed diff --git a/packages/integrations/node/src/nodeMiddleware.ts b/packages/integrations/node/src/nodeMiddleware.ts index ddaa95deb8bd..7f242809ee0e 100644 --- a/packages/integrations/node/src/nodeMiddleware.ts +++ b/packages/integrations/node/src/nodeMiddleware.ts @@ -1,8 +1,6 @@ import type { NodeApp } from 'astro/app/node'; import type { ServerResponse } from 'node:http'; -import type { Readable } from 'stream'; import { createOutgoingHttpHeaders } from './createOutgoingHttpHeaders.js'; -import { responseIterator } from './response-iterator.js'; import type { ErrorHandlerParams, Options, RequestHandlerParams } from './types.js'; // Disable no-unused-vars to avoid breaking signature change @@ -79,8 +77,14 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse: res.writeHead(status, nodeHeaders); if (webResponse.body) { try { - for await (const chunk of responseIterator(webResponse) as unknown as Readable) { - res.write(chunk); + const reader = webResponse.body.getReader(); + res.on("close", () => { + reader.cancel(); + }) + let result = await reader.read(); + while (!result.done) { + res.write(result.value); + result = await reader.read(); } } catch (err: any) { console.error(err?.stack || err?.message || String(err)); diff --git a/packages/integrations/node/src/response-iterator.ts b/packages/integrations/node/src/response-iterator.ts deleted file mode 100644 index b79c3a85345b..000000000000 --- a/packages/integrations/node/src/response-iterator.ts +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Original sources: - * - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts - * - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts - */ - -import { AstroError } from 'astro/errors'; -import type { ReadableStreamDefaultReadResult } from 'node:stream/web'; -import { Readable as NodeReadableStream } from 'stream'; - -interface NodeStreamIterator { - next(): Promise>; - [Symbol.asyncIterator]?(): AsyncIterator; -} - -interface PromiseIterator { - next(): Promise>; - [Symbol.asyncIterator]?(): AsyncIterator; -} - -interface ReaderIterator { - next(): Promise>; - [Symbol.asyncIterator]?(): AsyncIterator; -} - -const canUseSymbol = typeof Symbol === 'function' && typeof Symbol.for === 'function'; - -const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator; - -function isBuffer(value: any): value is Buffer { - return ( - value?.constructor != null && - typeof value.constructor.isBuffer === 'function' && - value.constructor.isBuffer(value) - ); -} - -function isNodeResponse(value: any): value is Response { - return !!(value as Response).body; -} - -function isReadableStream(value: any): value is ReadableStream { - return !!(value as ReadableStream).getReader; -} - -function isAsyncIterableIterator(value: any): value is AsyncIterableIterator { - return !!( - canUseAsyncIteratorSymbol && (value as AsyncIterableIterator)[Symbol.asyncIterator] - ); -} - -function isStreamableBlob(value: any): value is Blob { - return !!(value as Blob).stream; -} - -function isBlob(value: any): value is Blob { - return !!(value as Blob).arrayBuffer; -} - -function isNodeReadableStream(value: any): value is NodeReadableStream { - return !!(value as NodeReadableStream).pipe; -} - -function readerIterator(reader: ReadableStreamDefaultReader): AsyncIterableIterator { - const iterator: ReaderIterator = { - //@ts-expect-error - next() { - return reader.read(); - }, - }; - - if (canUseAsyncIteratorSymbol) { - iterator[Symbol.asyncIterator] = function (): AsyncIterator { - //@ts-expect-error - return this; - }; - } - - return iterator as AsyncIterableIterator; -} - -function promiseIterator(promise: Promise): AsyncIterableIterator { - let resolved = false; - - const iterator: PromiseIterator = { - next(): Promise> { - if (resolved) - return Promise.resolve({ - value: undefined, - done: true, - }); - resolved = true; - return new Promise(function (resolve, reject) { - promise - .then(function (value) { - resolve({ value: value as unknown as T, done: false }); - }) - .catch(reject); - }); - }, - }; - - if (canUseAsyncIteratorSymbol) { - iterator[Symbol.asyncIterator] = function (): AsyncIterator { - return this; - }; - } - - return iterator as AsyncIterableIterator; -} - -function nodeStreamIterator(stream: NodeReadableStream): AsyncIterableIterator { - let cleanup: (() => void) | null = null; - let error: Error | null = null; - let done = false; - const data: unknown[] = []; - - const waiting: [ - ( - value: - | IteratorResult - | PromiseLike> - ) => void, - (reason?: any) => void, - ][] = []; - - function onData(chunk: any) { - if (error) return; - if (waiting.length) { - const shiftedArr = waiting.shift(); - if (Array.isArray(shiftedArr) && shiftedArr[0]) { - return shiftedArr[0]({ value: chunk, done: false }); - } - } - data.push(chunk); - } - function onError(err: Error) { - error = err; - const all = waiting.slice(); - all.forEach(function (pair) { - pair[1](err); - }); - !cleanup || cleanup(); - } - function onEnd() { - done = true; - const all = waiting.slice(); - all.forEach(function (pair) { - pair[0]({ value: undefined, done: true }); - }); - !cleanup || cleanup(); - } - - cleanup = function () { - cleanup = null; - stream.removeListener('data', onData); - stream.removeListener('error', onError); - stream.removeListener('end', onEnd); - stream.removeListener('finish', onEnd); - stream.removeListener('close', onEnd); - }; - stream.on('data', onData); - stream.on('error', onError); - stream.on('end', onEnd); - stream.on('finish', onEnd); - stream.on('close', onEnd); - - function getNext(): Promise> { - return new Promise(function (resolve, reject) { - if (error) return reject(error); - if (data.length) return resolve({ value: data.shift() as T, done: false }); - if (done) return resolve({ value: undefined, done: true }); - waiting.push([resolve, reject]); - }); - } - - const iterator: NodeStreamIterator = { - next(): Promise> { - return getNext(); - }, - }; - - if (canUseAsyncIteratorSymbol) { - iterator[Symbol.asyncIterator] = function (): AsyncIterator { - return this; - }; - } - - return iterator as AsyncIterableIterator; -} - -function asyncIterator(source: AsyncIterableIterator): AsyncIterableIterator { - const iterator = source[Symbol.asyncIterator](); - return { - next(): Promise> { - return iterator.next(); - }, - [Symbol.asyncIterator](): AsyncIterableIterator { - return this; - }, - }; -} - -export function responseIterator(response: Response | Buffer): AsyncIterableIterator { - let body: unknown = response; - - if (isNodeResponse(response)) body = response.body; - - if (isBuffer(body)) body = NodeReadableStream.from(body); - - if (isAsyncIterableIterator(body)) return asyncIterator(body); - - if (isReadableStream(body)) return readerIterator(body.getReader()); - - // this errors without casting to ReadableStream - // because Blob.stream() returns a NodeJS ReadableStream - if (isStreamableBlob(body)) { - return readerIterator((body.stream() as unknown as ReadableStream).getReader()); - } - - if (isBlob(body)) return promiseIterator(body.arrayBuffer()); - - if (isNodeReadableStream(body)) return nodeStreamIterator(body); - - throw new AstroError( - 'Unknown body type for responseIterator. Please pass a streamable response.' - ); -} diff --git a/packages/integrations/node/test/api-route.test.js b/packages/integrations/node/test/api-route.test.js index c830eee2d6c8..7d9422ab4d0c 100644 --- a/packages/integrations/node/test/api-route.test.js +++ b/packages/integrations/node/test/api-route.test.js @@ -89,4 +89,23 @@ describe('API routes', () => { let [out] = await done; expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest); }); + + it('Can bail on streaming', async () => { + const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs'); + let { req, res, done } = createRequestAndResponse({ + url: '/streaming', + }); + + let locals = { cancelledByTheServer: false }; + + handler(req, res, () => {}, locals); + req.send(); + + await new Promise((resolve) => setTimeout(resolve, 500)); + res.emit("close"); + + await done; + + expect(locals).to.deep.include({ cancelledByTheServer: true }); + }); }); diff --git a/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts index fbf44c5478bc..3f1b236de76b 100644 --- a/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts +++ b/packages/integrations/node/test/fixtures/api-route/src/pages/hash.ts @@ -1,6 +1,6 @@ import crypto from 'node:crypto'; -export async function post({ request }: { request: Request }) { +export async function POST({ request }: { request: Request }) { const hash = crypto.createHash('sha256'); const iterable = request.body as unknown as AsyncIterable; diff --git a/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts b/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts new file mode 100644 index 000000000000..9ecb884bf89b --- /dev/null +++ b/packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts @@ -0,0 +1,22 @@ +export const GET = ({ locals }) => { + let sentChunks = 0; + + const readableStream = new ReadableStream({ + async pull(controller) { + if (sentChunks === 3) return controller.close(); + else sentChunks++; + + await new Promise(resolve => setTimeout(resolve, 1000)); + controller.enqueue(new TextEncoder().encode('hello\n')); + }, + cancel() { + locals.cancelledByTheServer = true; + } + }); + + return new Response(readableStream, { + headers: { + "Content-Type": "text/event-stream" + } + }); +}