Skip to content

Commit

Permalink
Cancel response stream when connection closes (#9071)
Browse files Browse the repository at this point in the history
* cancel stream on close

* add changeset

* add test

* Update .changeset/modern-ways-develop.md

Co-authored-by: Sarah Rainsberger <[email protected]>

---------

Co-authored-by: lilnasy <[email protected]>
Co-authored-by: Sarah Rainsberger <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2023
1 parent eeac288 commit c948713
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 233 deletions.
5 changes: 5 additions & 0 deletions .changeset/modern-ways-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@astrojs/node': patch
---

Fixes a bug where the response stream would not cancel when the connection closed
12 changes: 8 additions & 4 deletions packages/integrations/node/src/nodeMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import type { NodeApp } from 'astro/app/node';
import type { ServerResponse } from 'node:http';
import type { Readable } from 'stream';
import { createOutgoingHttpHeaders } from './createOutgoingHttpHeaders.js';
import { responseIterator } from './response-iterator.js';
import type { ErrorHandlerParams, Options, RequestHandlerParams } from './types.js';

// Disable no-unused-vars to avoid breaking signature change
Expand Down Expand Up @@ -79,8 +77,14 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse:
res.writeHead(status, nodeHeaders);
if (webResponse.body) {
try {
for await (const chunk of responseIterator(webResponse) as unknown as Readable) {
res.write(chunk);
const reader = webResponse.body.getReader();
res.on("close", () => {
reader.cancel();
})
let result = await reader.read();
while (!result.done) {
res.write(result.value);
result = await reader.read();
}
} catch (err: any) {
console.error(err?.stack || err?.message || String(err));
Expand Down
228 changes: 0 additions & 228 deletions packages/integrations/node/src/response-iterator.ts

This file was deleted.

19 changes: 19 additions & 0 deletions packages/integrations/node/test/api-route.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,23 @@ describe('API routes', () => {
let [out] = await done;
expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest);
});

it('Can bail on streaming', async () => {
const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs');
let { req, res, done } = createRequestAndResponse({
url: '/streaming',
});

let locals = { cancelledByTheServer: false };

handler(req, res, () => {}, locals);
req.send();

await new Promise((resolve) => setTimeout(resolve, 500));
res.emit("close");

await done;

expect(locals).to.deep.include({ cancelledByTheServer: true });
});
});
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import crypto from 'node:crypto';

export async function post({ request }: { request: Request }) {
export async function POST({ request }: { request: Request }) {
const hash = crypto.createHash('sha256');

const iterable = request.body as unknown as AsyncIterable<Uint8Array>;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export const GET = ({ locals }) => {
let sentChunks = 0;

const readableStream = new ReadableStream({
async pull(controller) {
if (sentChunks === 3) return controller.close();
else sentChunks++;

await new Promise(resolve => setTimeout(resolve, 1000));
controller.enqueue(new TextEncoder().encode('hello\n'));
},
cancel() {
locals.cancelledByTheServer = true;
}
});

return new Response(readableStream, {
headers: {
"Content-Type": "text/event-stream"
}
});
}

0 comments on commit c948713

Please sign in to comment.