-
-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent server hangs on unconsumed streams. #81
Changes from 18 commits
97874e2
f1d0553
d4ef915
e5090ac
8928ce0
2cbbf2d
493a617
148cfde
0ec1085
3c5ce98
65f78f6
9094cf9
ab36717
232cfe9
b03ce1d
b8a0798
10f0839
d473973
6b5d625
7599006
1aa7e67
2ba4713
e6c7c76
c9a38da
986efa6
957fd18
b3d2a63
d1b923b
50d206a
2e3a1be
44a0e14
8348279
07009b6
6f7d10f
662ce40
32327ba
40dfe96
2c66e6b
5685ddb
b84661b
44fea2a
095343e
c71fcb1
53da40d
8b05eaf
b245e5f
8f064e6
1782688
e5f49dc
0f02b12
d9161a1
7891c16
19cfcd8
823b979
5b9a5a6
8180d50
8dc5c1d
eefb6dc
2b1fc86
47a85a0
cbf8c7d
ba8f901
8c3e51a
4f7e2fd
5afbd4b
04d56bc
531c6aa
77bafbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,9 +65,9 @@ | |
"build:clean": "rm -r lib; mkdir lib", | ||
"build:mjs": "BABEL_ESM=1 babel src -d lib --keep-file-extension", | ||
"build:js": "babel src -d lib", | ||
"build:prettier": "prettier 'lib/**/*.{mjs,js}' --write", | ||
"build:prettier": "prettier 'src/**/*.{mjs,js}' --write && prettier '*.{json,md}' --write", | ||
"lint:eslint": "eslint . --ext mjs,js", | ||
"lint:prettier": "prettier '**/*.{json,md}' -l", | ||
"lint:prettier": "prettier '*.{json,md}' -l", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto, undo. |
||
"tap:mjs": "node --experimental-modules --no-warnings lib/test | tap-mocha-reporter spec", | ||
"tap:js": "node lib/test | tap-mocha-reporter spec", | ||
"watch": "watch 'npm test --silent' src --interval 1", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,10 @@ class Upload { | |
this.done = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this is not used anywhere anymore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, that's correct! Removed. |
||
}) | ||
|
||
// Prevent a crash if the stream disconnects before the consumers’ error | ||
// listeners can attach. | ||
file.stream.on('error', () => {}) | ||
|
||
// Monkey patch busboy to emit an error when a file is too big. | ||
file.stream.once('limit', () => | ||
file.stream.emit( | ||
|
@@ -94,7 +98,7 @@ export const processRequest = ( | |
operationsPath.set(path, map.get(fieldName).promise) | ||
} | ||
|
||
resolve(operations) | ||
resolve({ operations, map }) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a breaking change to the API. A fair few people such as Apollo use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good point; I will change how this works. |
||
} | ||
} | ||
}) | ||
|
@@ -138,6 +142,8 @@ export const processRequest = ( | |
) | ||
}) | ||
|
||
parser.on('error', () => null) | ||
|
||
request.on('close', () => { | ||
if (map) | ||
for (const upload of map.values()) | ||
|
@@ -162,16 +168,39 @@ export const processRequest = ( | |
}) | ||
|
||
export const apolloUploadKoa = options => async (ctx, next) => { | ||
if (ctx.request.is('multipart/form-data')) | ||
ctx.request.body = await processRequest(ctx.req, options) | ||
await next() | ||
if (!ctx.request.is('multipart/form-data')) return next() | ||
|
||
// add uploads to the request | ||
const { operations, map } = await processRequest(ctx.req, options) | ||
ctx.request.body = operations | ||
|
||
try { | ||
ctx.respond = false | ||
await next() | ||
} finally { | ||
await Promise.all( | ||
[...map.values()].map(async upload => { | ||
if (upload.done) return | ||
|
||
await upload.promise | ||
return new Promise(resolve => { | ||
upload.file.stream.on('end', resolve) | ||
upload.file.stream.on('error', resolve) | ||
if (!upload.file.stream.readableFlowing) upload.file.stream.resume() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks tricky, what is it for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't want to interfere with any streams that are being consumed, but we do want to run through the rest. Using readableFlowing let's us know what state the stream is in. |
||
}) | ||
}) | ||
) | ||
|
||
ctx.respond = true | ||
if (ctx.body) ctx.body = ctx.body | ||
} | ||
} | ||
|
||
export const apolloUploadExpress = options => (request, response, next) => { | ||
if (!request.is('multipart/form-data')) return next() | ||
processRequest(request, options) | ||
.then(body => { | ||
request.body = body | ||
.then(({ operations }) => { | ||
request.body = operations | ||
next() | ||
}) | ||
.catch(error => { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import fs from 'fs' | ||
import { Readable } from 'stream' | ||
import t from 'tap' | ||
import Koa from 'koa' | ||
import express from 'express' | ||
|
@@ -101,6 +102,66 @@ t.test('Single file.', async t => { | |
}) | ||
}) | ||
|
||
t.test('Early response.', async t => { | ||
t.jobs = 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great to know! (I'm new to tap.) |
||
|
||
const testRequest = async (port, stream) => { | ||
const body = new FormData() | ||
|
||
body.append( | ||
'operations', | ||
JSON.stringify({ | ||
variables: { | ||
file: null | ||
} | ||
}) | ||
) | ||
|
||
body.append('map', JSON.stringify({ 1: ['variables.file'] })) | ||
body.append(1, stream) | ||
|
||
await fetch(`http://localhost:${port}`, { method: 'POST', body }) | ||
} | ||
|
||
await t.test('Koa middleware.', async t => { | ||
t.plan(1) | ||
|
||
const data = fs.readFileSync(TEST_FILE_PATH) | ||
|
||
var requestHasFinished = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good; not to get side-tracked, but is there a benefit to |
||
const stream = new Readable() | ||
stream.path = TEST_FILE_PATH | ||
stream._read = () => {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this about? The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the old, but still-supported way of implementing streams in node (for Streams 2, which is still the current implementation). |
||
stream.on('end', () => { | ||
requestHasFinished = true | ||
}) | ||
|
||
const app = new Koa().use(apolloUploadKoa()).use(ctx => { | ||
ctx.body = 'EARLY RETURN VALUE' | ||
}) | ||
const port = await startServer(t, app) | ||
const promise = testRequest(port, stream).then( | ||
() => { | ||
t.equals( | ||
requestHasFinished, | ||
true, | ||
'The server should not respond before the request has finished' | ||
) | ||
}, | ||
err => { | ||
throw err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this promise block would be more elegant as an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Totally! |
||
} | ||
) | ||
|
||
setTimeout(() => { | ||
stream.push(data) | ||
stream.push(null) | ||
}, 10) | ||
|
||
return promise | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good! |
||
}) | ||
}) | ||
|
||
t.test('Deduped files.', async t => { | ||
t.jobs = 2 | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please undo all these script changes; for an explanation see 8928ce0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies! Will fix.