diff --git a/.changeset/blue-rabbits-hear.md b/.changeset/blue-rabbits-hear.md new file mode 100644 index 000000000000..dfe33a863355 --- /dev/null +++ b/.changeset/blue-rabbits-hear.md @@ -0,0 +1,9 @@ +--- +'@astrojs/node': minor +--- + +Sometimes Astro sends a ReadableStream as a response and it raise an error **TypeError: body is not async iterable.** + +I added a function to get a response iterator from different response types (sourced from apollo-client). + +With this, node adapter can handle all the Astro response types. diff --git a/packages/integrations/node/package.json b/packages/integrations/node/package.json index cc18df1a4fbc..c236937a7784 100644 --- a/packages/integrations/node/package.json +++ b/packages/integrations/node/package.json @@ -37,6 +37,7 @@ "astro": "^1.6.9" }, "devDependencies": { + "@types/node-fetch": "^2.6.2", "@types/send": "^0.17.1", "astro": "workspace:*", "astro-scripts": "workspace:*", diff --git a/packages/integrations/node/src/middleware.ts b/packages/integrations/node/src/middleware.ts index a1058399d6c9..bfa7b74d5e3c 100644 --- a/packages/integrations/node/src/middleware.ts +++ b/packages/integrations/node/src/middleware.ts @@ -1,6 +1,7 @@ import type { NodeApp } from 'astro/app/node'; import type { IncomingMessage, ServerResponse } from 'http'; import type { Readable } from 'stream'; +import { responseIterator } from './response-iterator'; export default function (app: NodeApp) { return async function ( @@ -38,7 +39,7 @@ export default function (app: NodeApp) { } async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse: Response) { - const { status, headers, body } = webResponse; + const { status, headers } = webResponse; if (app.setCookieHeaders) { const setCookieHeaders: Array = Array.from(app.setCookieHeaders(webResponse)); @@ -48,8 +49,8 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse: } res.writeHead(status, Object.fromEntries(headers.entries())); - if (body) { - for await (const chunk of body as unknown as Readable) { + if (webResponse.body) { + for await (const chunk of responseIterator(webResponse) as unknown as Readable) { res.write(chunk); } } diff --git a/packages/integrations/node/src/response-iterator.ts b/packages/integrations/node/src/response-iterator.ts new file mode 100644 index 000000000000..d197f380c856 --- /dev/null +++ b/packages/integrations/node/src/response-iterator.ts @@ -0,0 +1,241 @@ +/** + * Original sources: + * - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts + * - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts + */ + + import type { Response as NodeResponse } from "node-fetch"; + import { Readable as NodeReadableStream } from "stream"; + + interface NodeStreamIterator { + next(): Promise>; + [Symbol.asyncIterator]?(): AsyncIterator; + } + + interface PromiseIterator { + next(): Promise>; + [Symbol.asyncIterator]?(): AsyncIterator; + } + + interface ReaderIterator { + next(): Promise>; + [Symbol.asyncIterator]?(): AsyncIterator; + } + + const canUseSymbol = + typeof Symbol === 'function' && + typeof Symbol.for === 'function'; + + const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator; + + function isBuffer(value: any): value is Buffer { + return value != null && value.constructor != null && + typeof value.constructor.isBuffer === 'function' && value.constructor.isBuffer(value) + } + + function isNodeResponse(value: any): value is NodeResponse { + return !!(value as NodeResponse).body; + } + + function isReadableStream(value: any): value is ReadableStream { + return !!(value as ReadableStream).getReader; + } + + function isAsyncIterableIterator( + value: any + ): value is AsyncIterableIterator { + return !!( + canUseAsyncIteratorSymbol && + (value as AsyncIterableIterator)[Symbol.asyncIterator] + ); + } + + function isStreamableBlob(value: any): value is Blob { + return !!(value as Blob).stream; + } + + function isBlob(value: any): value is Blob { + return !!(value as Blob).arrayBuffer; + } + + function isNodeReadableStream(value: any): value is NodeReadableStream { + return !!(value as NodeReadableStream).pipe; + } + + function readerIterator( + reader: ReadableStreamDefaultReader + ): AsyncIterableIterator { + const iterator: ReaderIterator = { + next() { + return reader.read(); + }, + }; + + if (canUseAsyncIteratorSymbol) { + iterator[Symbol.asyncIterator] = function (): AsyncIterator { + //@ts-ignore + return this; + }; + } + + return iterator as AsyncIterableIterator; + } + + function promiseIterator( + promise: Promise + ): AsyncIterableIterator { + let resolved = false; + + const iterator: PromiseIterator = { + next(): Promise> { + if (resolved) + return Promise.resolve({ + value: undefined, + done: true, + }); + resolved = true; + return new Promise(function (resolve, reject) { + promise + .then(function (value) { + resolve({ value: value as unknown as T, done: false }); + }) + .catch(reject); + }); + }, + }; + + if (canUseAsyncIteratorSymbol) { + iterator[Symbol.asyncIterator] = function (): AsyncIterator { + return this; + }; + } + + return iterator as AsyncIterableIterator; + } + + function nodeStreamIterator( + stream: NodeReadableStream + ): AsyncIterableIterator { + let cleanup: (() => void) | null = null; + let error: Error | null = null; + let done = false; + const data: unknown[] = []; + + const waiting: [ + ( + value: + | IteratorResult + | PromiseLike> + ) => void, + (reason?: any) => void + ][] = []; + + function onData(chunk: any) { + if (error) return; + if (waiting.length) { + const shiftedArr = waiting.shift(); + if (Array.isArray(shiftedArr) && shiftedArr[0]) { + return shiftedArr[0]({ value: chunk, done: false }); + } + } + data.push(chunk); + } + function onError(err: Error) { + error = err; + const all = waiting.slice(); + all.forEach(function (pair) { + pair[1](err); + }); + !cleanup || cleanup(); + } + function onEnd() { + done = true; + const all = waiting.slice(); + all.forEach(function (pair) { + pair[0]({ value: undefined, done: true }); + }); + !cleanup || cleanup(); + } + + cleanup = function () { + cleanup = null; + stream.removeListener("data", onData); + stream.removeListener("error", onError); + stream.removeListener("end", onEnd); + stream.removeListener("finish", onEnd); + stream.removeListener("close", onEnd); + }; + stream.on("data", onData); + stream.on("error", onError); + stream.on("end", onEnd); + stream.on("finish", onEnd); + stream.on("close", onEnd); + + function getNext(): Promise> { + return new Promise(function (resolve, reject) { + if (error) return reject(error); + if (data.length) return resolve({ value: data.shift() as T, done: false }); + if (done) return resolve({ value: undefined, done: true }); + waiting.push([resolve, reject]); + }); + } + + const iterator: NodeStreamIterator = { + next(): Promise> { + return getNext(); + }, + }; + + if (canUseAsyncIteratorSymbol) { + iterator[Symbol.asyncIterator] = function (): AsyncIterator { + return this; + }; + } + + return iterator as AsyncIterableIterator; + } + + function asyncIterator( + source: AsyncIterableIterator + ): AsyncIterableIterator { + const iterator = source[Symbol.asyncIterator](); + return { + next(): Promise> { + return iterator.next(); + }, + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + }, + }; + } + + export function responseIterator( + response: Response | NodeResponse | Buffer + ): AsyncIterableIterator { + let body: unknown = response; + + if (isNodeResponse(response)) body = response.body; + + if (isBuffer(body)) body = NodeReadableStream.from(body); + + if (isAsyncIterableIterator(body)) return asyncIterator(body); + + if (isReadableStream(body)) return readerIterator(body.getReader()); + + // this errors without casting to ReadableStream + // because Blob.stream() returns a NodeJS ReadableStream + if (isStreamableBlob(body)) { + return readerIterator( + (body.stream() as unknown as ReadableStream).getReader() + ); + } + + if (isBlob(body)) return promiseIterator(body.arrayBuffer()); + + if (isNodeReadableStream(body)) return nodeStreamIterator(body); + + throw new Error( + "Unknown body type for responseIterator. Please pass a streamable response." + ); + } + \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2216b931fbbf..e577bedc0c80 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2982,6 +2982,7 @@ importers: packages/integrations/node: specifiers: '@astrojs/webapi': ^1.1.1 + '@types/node-fetch': ^2.6.2 '@types/send': ^0.17.1 astro: workspace:* astro-scripts: workspace:* @@ -2993,6 +2994,7 @@ importers: '@astrojs/webapi': link:../../webapi send: 0.18.0 devDependencies: + '@types/node-fetch': 2.6.2 '@types/send': 0.17.1 astro: link:../../astro astro-scripts: link:../../../scripts @@ -9712,6 +9714,13 @@ packages: '@types/unist': 2.0.6 dev: false + /@types/node-fetch/2.6.2: + resolution: {integrity: sha512-DHqhlq5jeESLy19TYhLakJ07kNumXWjcDdxXsLUMJZ6ue8VZJj4kLPQVE/2mdHh3xZziNF1xppu5lwmS53HR+A==} + dependencies: + '@types/node': 18.11.9 + form-data: 3.0.1 + dev: true + /@types/node/12.20.55: resolution: {integrity: sha512-J8xLz7q2OFulZ2cyGTLE1TbbZcjpno7FaN6zdJNrgAdrJ+DZzh/uFR6YrTb4C+nXakvud8Q4+rbhoIWlYQbUFQ==} dev: true @@ -10581,6 +10590,10 @@ packages: resolution: {integrity: sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==} dev: false + /asynckit/0.4.0: + resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} + dev: true + /at-least-node/1.0.0: resolution: {integrity: sha512-+q/t7Ekv1EDY2l6Gda6LLiX14rU9TV20Wa3ofeQmwPFZbOMo9DXrLbOjFaaclkXKWidIaopwAObQDqwWtGUjqg==} engines: {node: '>= 4.0.0'} @@ -11097,6 +11110,13 @@ packages: resolution: {integrity: sha512-3tlv/dIP7FWvj3BsbHrGLJ6l/oKh1O3TcgBqMn+yyCagOxc23fyzDS6HypQbgxWbkpDnf52p1LuR4eWDQ/K9WQ==} dev: false + /combined-stream/1.0.8: + resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} + engines: {node: '>= 0.8'} + dependencies: + delayed-stream: 1.0.0 + dev: true + /comma-separated-tokens/2.0.3: resolution: {integrity: sha512-Fu4hJdvzeylCfQPp9SGWidpzrMs7tTrlu6Vb8XGaRGck8QSNZJJp538Wrb60Lax4fPwR64ViY468OIUTbRlGZg==} dev: false @@ -11458,6 +11478,11 @@ packages: slash: 4.0.0 dev: true + /delayed-stream/1.0.0: + resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} + engines: {node: '>=0.4.0'} + dev: true + /delegates/1.0.0: resolution: {integrity: sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==} dev: false @@ -12751,6 +12776,15 @@ packages: resolution: {integrity: sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==} dev: true + /form-data/3.0.1: + resolution: {integrity: sha512-RHkBKtLWUVwd7SqRIvCZMEvAMoGUp0XU+seQiZejj0COz3RI3hWP4sCv3gZWWLjJTd7rGwcsF5eKZGii0r/hbg==} + engines: {node: '>= 6'} + dependencies: + asynckit: 0.4.0 + combined-stream: 1.0.8 + mime-types: 2.1.35 + dev: true + /format/0.2.2: resolution: {integrity: sha512-wzsgA6WOq+09wrU1tsJ09udeR/YZRaeArL9e1wPbFg3GG2yDnC2ldKpxs4xunpFF9DgqCqOIra3bc1HWrJ37Ww==} engines: {node: '>=0.4.x'}