Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add response iterator to node adapter #5418

Merged
merged 4 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/blue-rabbits-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@astrojs/node': minor
---

Sometimes Astro sends a ReadableStream as a response and it raise an error **TypeError: body is not async iterable.**

I added a function to get a response iterator from different response types (sourced from apollo-client).

With this, node adapter can handle all the Astro response types.
1 change: 1 addition & 0 deletions packages/integrations/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"astro": "^1.6.9"
},
"devDependencies": {
"@types/node-fetch": "^2.6.2",
"@types/send": "^0.17.1",
"astro": "workspace:*",
"astro-scripts": "workspace:*",
Expand Down
7 changes: 4 additions & 3 deletions packages/integrations/node/src/middleware.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { NodeApp } from 'astro/app/node';
import type { IncomingMessage, ServerResponse } from 'http';
import type { Readable } from 'stream';
import { responseIterator } from './response-iterator';

export default function (app: NodeApp) {
return async function (
Expand Down Expand Up @@ -38,7 +39,7 @@ export default function (app: NodeApp) {
}

async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse: Response) {
const { status, headers, body } = webResponse;
const { status, headers } = webResponse;

if (app.setCookieHeaders) {
const setCookieHeaders: Array<string> = Array.from(app.setCookieHeaders(webResponse));
Expand All @@ -48,8 +49,8 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse:
}

res.writeHead(status, Object.fromEntries(headers.entries()));
if (body) {
for await (const chunk of body as unknown as Readable) {
if (webResponse.body) {
for await (const chunk of responseIterator(webResponse) as unknown as Readable) {
res.write(chunk);
}
}
Expand Down
241 changes: 241 additions & 0 deletions packages/integrations/node/src/response-iterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/**
* Original sources:
* - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts
* - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts
*/

import type { Response as NodeResponse } from "node-fetch";
import { Readable as NodeReadableStream } from "stream";

interface NodeStreamIterator<T> {
next(): Promise<IteratorResult<T, boolean | undefined>>;
[Symbol.asyncIterator]?(): AsyncIterator<T>;
}

interface PromiseIterator<T> {
next(): Promise<IteratorResult<T, ArrayBuffer | undefined>>;
[Symbol.asyncIterator]?(): AsyncIterator<T>;
}

interface ReaderIterator<T> {
next(): Promise<ReadableStreamDefaultReadResult<T>>;
[Symbol.asyncIterator]?(): AsyncIterator<T>;
}

const canUseSymbol =
typeof Symbol === 'function' &&
typeof Symbol.for === 'function';

const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator;

function isBuffer(value: any): value is Buffer {
return value != null && value.constructor != null &&
typeof value.constructor.isBuffer === 'function' && value.constructor.isBuffer(value)
}

function isNodeResponse(value: any): value is NodeResponse {
return !!(value as NodeResponse).body;
}

function isReadableStream(value: any): value is ReadableStream<any> {
return !!(value as ReadableStream<any>).getReader;
}

function isAsyncIterableIterator(
value: any
): value is AsyncIterableIterator<any> {
return !!(
canUseAsyncIteratorSymbol &&
(value as AsyncIterableIterator<any>)[Symbol.asyncIterator]
);
}

function isStreamableBlob(value: any): value is Blob {
return !!(value as Blob).stream;
}

function isBlob(value: any): value is Blob {
return !!(value as Blob).arrayBuffer;
}

function isNodeReadableStream(value: any): value is NodeReadableStream {
return !!(value as NodeReadableStream).pipe;
}

function readerIterator<T>(
reader: ReadableStreamDefaultReader<T>
): AsyncIterableIterator<T> {
const iterator: ReaderIterator<T> = {
next() {
return reader.read();
},
};

if (canUseAsyncIteratorSymbol) {
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
//@ts-ignore
return this;
};
}

return iterator as AsyncIterableIterator<T>;
}

function promiseIterator<T = ArrayBuffer>(
promise: Promise<ArrayBuffer>
): AsyncIterableIterator<T> {
let resolved = false;

const iterator: PromiseIterator<T> = {
next(): Promise<IteratorResult<T, ArrayBuffer | undefined>> {
if (resolved)
return Promise.resolve({
value: undefined,
done: true,
});
resolved = true;
return new Promise(function (resolve, reject) {
promise
.then(function (value) {
resolve({ value: value as unknown as T, done: false });
})
.catch(reject);
});
},
};

if (canUseAsyncIteratorSymbol) {
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
return this;
};
}

return iterator as AsyncIterableIterator<T>;
}

function nodeStreamIterator<T>(
stream: NodeReadableStream
): AsyncIterableIterator<T> {
let cleanup: (() => void) | null = null;
let error: Error | null = null;
let done = false;
const data: unknown[] = [];

const waiting: [
(
value:
| IteratorResult<T, boolean | undefined>
| PromiseLike<IteratorResult<T, boolean | undefined>>
) => void,
(reason?: any) => void
][] = [];

function onData(chunk: any) {
if (error) return;
if (waiting.length) {
const shiftedArr = waiting.shift();
if (Array.isArray(shiftedArr) && shiftedArr[0]) {
return shiftedArr[0]({ value: chunk, done: false });
}
}
data.push(chunk);
}
function onError(err: Error) {
error = err;
const all = waiting.slice();
all.forEach(function (pair) {
pair[1](err);
});
!cleanup || cleanup();
}
function onEnd() {
done = true;
const all = waiting.slice();
all.forEach(function (pair) {
pair[0]({ value: undefined, done: true });
});
!cleanup || cleanup();
}

cleanup = function () {
cleanup = null;
stream.removeListener("data", onData);
stream.removeListener("error", onError);
stream.removeListener("end", onEnd);
stream.removeListener("finish", onEnd);
stream.removeListener("close", onEnd);
};
stream.on("data", onData);
stream.on("error", onError);
stream.on("end", onEnd);
stream.on("finish", onEnd);
stream.on("close", onEnd);

function getNext(): Promise<IteratorResult<T, boolean | undefined>> {
return new Promise(function (resolve, reject) {
if (error) return reject(error);
if (data.length) return resolve({ value: data.shift() as T, done: false });
if (done) return resolve({ value: undefined, done: true });
waiting.push([resolve, reject]);
});
}

const iterator: NodeStreamIterator<T> = {
next(): Promise<IteratorResult<T, boolean | undefined>> {
return getNext();
},
};

if (canUseAsyncIteratorSymbol) {
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
return this;
};
}

return iterator as AsyncIterableIterator<T>;
}

function asyncIterator<T>(
source: AsyncIterableIterator<T>
): AsyncIterableIterator<T> {
const iterator = source[Symbol.asyncIterator]();
return {
next(): Promise<IteratorResult<T, boolean>> {
return iterator.next();
},
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
return this;
},
};
}

export function responseIterator<T>(
response: Response | NodeResponse | Buffer
): AsyncIterableIterator<T> {
let body: unknown = response;

if (isNodeResponse(response)) body = response.body;

if (isBuffer(body)) body = NodeReadableStream.from(body);

if (isAsyncIterableIterator(body)) return asyncIterator<T>(body);

if (isReadableStream(body)) return readerIterator<T>(body.getReader());

// this errors without casting to ReadableStream<T>
// because Blob.stream() returns a NodeJS ReadableStream
if (isStreamableBlob(body)) {
return readerIterator<T>(
(body.stream() as unknown as ReadableStream<T>).getReader()
);
}

if (isBlob(body)) return promiseIterator<T>(body.arrayBuffer());

if (isNodeReadableStream(body)) return nodeStreamIterator<T>(body);

throw new Error(
"Unknown body type for responseIterator. Please pass a streamable response."
);
}

34 changes: 34 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.