From 3505ce2b6f016b3bdda72241bac51a4ad54b76ff Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Thu, 15 Oct 2020 11:31:48 -0400 Subject: [PATCH] support for experimental defer-stream flush response if compression middleware is used --- integrationTests/ts/package.json | 2 +- package-lock.json | 5 +- package.json | 2 +- src/__tests__/http-test.ts | 308 ++++++++++++++++++++++++++++++- src/index.ts | 124 +++++++++++-- src/isAsyncIterable.ts | 9 + 6 files changed, 425 insertions(+), 25 deletions(-) create mode 100644 src/isAsyncIterable.ts diff --git a/integrationTests/ts/package.json b/integrationTests/ts/package.json index ef8f9cec..594b3224 100644 --- a/integrationTests/ts/package.json +++ b/integrationTests/ts/package.json @@ -6,7 +6,7 @@ "dependencies": { "@types/node": "14.0.13", "express-graphql": "file:../express-graphql.tgz", - "graphql": "14.7.0", + "graphql": "https://registry.npmjs.org/graphql-experimental/-/graphql-experimental-5.0.2.tgz", "typescript-3.4": "npm:typescript@3.4.x", "typescript-3.5": "npm:typescript@3.5.x", "typescript-3.6": "npm:typescript@3.6.x", diff --git a/package-lock.json b/package-lock.json index 6c39c6c0..9964b1b3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3383,9 +3383,8 @@ } }, "graphql": { - "version": "15.3.0", - "resolved": "https://registry.npmjs.org/graphql/-/graphql-15.3.0.tgz", - "integrity": "sha512-GTCJtzJmkFLWRfFJuoo9RWWa/FfamUHgiFosxi/X1Ani4AVWbeyBenZTNX6dM+7WSbbFfTo/25eh0LLkwHMw2w==", + "version": "https://registry.npmjs.org/graphql-experimental/-/graphql-experimental-5.0.2.tgz", + "integrity": "sha512-enKSTymvZubyZDBMCekwRI72MH4PooonvEt9a93S9080qvShwlmP+oPRihamYWI3I6trFjFQS3W5npH+BLFG9g==", "dev": true }, "graphql-language-service": { diff --git a/package.json b/package.json index 7bc60e75..6173a6fd 100644 --- a/package.json +++ b/package.json @@ -83,7 +83,7 @@ "eslint-plugin-node": "11.1.0", "express": "4.17.1", "graphiql": "1.0.3", - "graphql": "15.3.0", + "graphql": "https://registry.npmjs.org/graphql-experimental/-/graphql-experimental-5.0.2.tgz", "mocha": "8.1.3", "multer": "1.4.2", "nyc": "15.1.0", diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index ffca7714..83399cea 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -25,6 +25,7 @@ import { } from 'graphql'; import { graphqlHTTP } from '../index'; +import { isAsyncIterable } from '../isAsyncIterable'; type Middleware = (req: any, res: any, next: () => void) => unknown; type Server = () => { @@ -1027,6 +1028,60 @@ function runTests(server: Server) { errors: [{ message: 'Must provide query string.' }], }); }); + + it('allows for streaming results with @defer', async () => { + const app = server(); + const fakeFlush = sinon.fake(); + + app.use((_, res, next) => { + res.flush = fakeFlush; + next(); + }); + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ ...frag @defer(label: "deferLabel") } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(fakeFlush.callCount).to.equal(2); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 26', + '', + '{"data":{},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 78', + '', + '{"data":{"test":"Hello World"},"path":[],"label":"deferLabel","hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); }); describe('Pretty printing', () => { @@ -1109,6 +1164,62 @@ function runTests(server: Server) { expect(unprettyResponse.text).to.equal('{"data":{"test":"Hello World"}}'); }); + it('supports pretty printing async iterable requests', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + pretty: true, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 35', + '', + ['{', ' "data": {},', ' "hasNext": true', '}'].join('\n'), + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 79', + '', + [ + '{', + ' "data": {', + ' "test": "Hello World"', + ' },', + ' "path": [],', + ' "hasNext": false', + '}', + ].join('\n'), + '', + '-----', + '', + ].join('\r\n'), + ); + }); }); it('will send request and response when using thunk', async () => { @@ -1229,6 +1340,108 @@ function runTests(server: Server) { }); }); + it('allows for custom error formatting in initial payload of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + customFormatErrorFn(error) { + return { message: 'Custom error format: ' + error.message }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ thrower, ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 94', + '', + '{"errors":[{"message":"Custom error format: Throws!"}],"data":{"thrower":null},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 57', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + + it('allows for custom error formatting in subsequent payloads of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + customFormatErrorFn(error) { + return { message: 'Custom error format: ' + error.message }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ test(who: "World"), ...frag @defer } fragment frag on QueryRoot { thrower }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 46', + '', + '{"data":{"test":"Hello World"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 105', + '', + '{"data":{"thrower":null},"path":[],"errors":[{"message":"Custom error format: Throws!"}],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + it('allows for custom error formatting to elaborate', async () => { const app = server(); @@ -2069,6 +2282,10 @@ function runTests(server: Server) { async customExecuteFn(args) { seenExecuteArgs = args; const result = await Promise.resolve(execute(args)); + // istanbul ignore if this test query will never return an async iterable + if (isAsyncIterable(result)) { + return result; + } return { ...result, data: { @@ -2222,6 +2439,57 @@ function runTests(server: Server) { }); }); + it('allows for custom extensions in initial and subsequent payloads of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + extensions({ result }) { + return { preservedResult: { ...result } }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ hello: test(who: "Rob"), ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 124', + '', + '{"data":{"hello":"Hello Rob"},"hasNext":true,"extensions":{"preservedResult":{"data":{"hello":"Hello Rob"},"hasNext":true}}}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 148', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false,"extensions":{"preservedResult":{"data":{"test":"Hello World"},"path":[],"hasNext":false}}}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + it('extension function may be async', async () => { const app = server(); @@ -2262,12 +2530,44 @@ function runTests(server: Server) { const response = await app .request() - .get(urlString({ query: '{test}', raw: '' })) - .set('Accept', 'text/html'); + .get( + urlString({ + query: + '{ hello: test(who: "Rob"), ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + raw: '', + }), + ) + .set('Accept', 'text/html') + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); expect(response.status).to.equal(200); - expect(response.type).to.equal('application/json'); - expect(response.text).to.equal('{"data":{"test":"Hello World"}}'); + expect(response.type).to.equal('multipart/mixed'); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 45', + '', + '{"data":{"hello":"Hello Rob"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 57', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); }); }); } diff --git a/src/index.ts b/src/index.ts index 23b4d930..621f0639 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ -import type { IncomingMessage, ServerResponse } from 'http'; +import type { IncomingMessage } from 'http'; +import { ServerResponse } from 'http'; import type { ASTVisitor, @@ -8,6 +9,9 @@ import type { ExecutionArgs, ExecutionResult, FormattedExecutionResult, + ExecutionPatchResult, + FormattedExecutionPatchResult, + AsyncExecutionResult, GraphQLError, GraphQLSchema, GraphQLFieldResolver, @@ -29,12 +33,16 @@ import { import type { GraphiQLOptions, GraphiQLData } from './renderGraphiQL'; import { parseBody } from './parseBody'; +import { isAsyncIterable } from './isAsyncIterable'; import { renderGraphiQL } from './renderGraphiQL'; // `url` is always defined for IncomingMessage coming from http.Server type Request = IncomingMessage & { url: string }; -type Response = ServerResponse & { json?: (data: unknown) => void }; +type Response = ServerResponse & { + json?: (data: unknown) => void; + flush?: () => void; +}; type MaybePromise = Promise | T; /** @@ -93,7 +101,9 @@ export interface OptionsData { * An optional function which will be used to execute instead of default `execute` * from `graphql-js`. */ - customExecuteFn?: (args: ExecutionArgs) => MaybePromise; + customExecuteFn?: ( + args: ExecutionArgs, + ) => MaybePromise>; /** * An optional function which will be used to format any errors produced by @@ -171,7 +181,7 @@ export interface RequestInfo { /** * The result of executing the operation. */ - result: FormattedExecutionResult; + result: AsyncExecutionResult; /** * A value to pass as the context to the graphql() function. @@ -199,7 +209,10 @@ export function graphqlHTTP(options: Options): Middleware { let showGraphiQL = false; let graphiqlOptions; let formatErrorFn = formatError; + let extensionsFn; let pretty = false; + let documentAST: DocumentNode; + let executeResult; let result: ExecutionResult; try { @@ -228,7 +241,6 @@ export function graphqlHTTP(options: Options): Middleware { const fieldResolver = optionsData.fieldResolver; const typeResolver = optionsData.typeResolver; const graphiql = optionsData.graphiql ?? false; - const extensionsFn = optionsData.extensions; const context = optionsData.context ?? request; const parseFn = optionsData.customParseFn ?? parse; const executeFn = optionsData.customExecuteFn ?? execute; @@ -262,6 +274,23 @@ export function graphqlHTTP(options: Options): Middleware { graphiqlOptions = graphiql; } + // Collect and apply any metadata extensions if a function was provided. + // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (optionsData.extensions) { + extensionsFn = (payload: AsyncExecutionResult) => { + /* istanbul ignore next condition not reachable, required for flow */ + if (optionsData.extensions) { + return optionsData.extensions({ + document: documentAST, + variables, + operationName, + result: payload, + context, + }); + } + }; + } + // If there is no query, but GraphiQL will be displayed, do not produce // a result, otherwise return a 400: Bad Request. if (query == null) { @@ -281,7 +310,6 @@ export function graphqlHTTP(options: Options): Middleware { } // Parse source to AST, reporting any syntax error. - let documentAST; try { documentAST = parseFn(new Source(query, 'GraphQL request')); } catch (syntaxError) { @@ -327,7 +355,7 @@ export function graphqlHTTP(options: Options): Middleware { // Perform the execution, reporting any errors creating the context. try { - result = await executeFn({ + executeResult = await executeFn({ schema, document: documentAST, rootValue, @@ -344,16 +372,18 @@ export function graphqlHTTP(options: Options): Middleware { }); } - // Collect and apply any metadata extensions if a function was provided. - // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (isAsyncIterable(executeResult)) { + // Get first payload from AsyncIterator. http status will reflect status + // of this payload. + const asyncIterator = getAsyncIterator(executeResult); + const { value } = await asyncIterator.next(); + result = value; + } else { + result = executeResult; + } + if (extensionsFn) { - const extensions = await extensionsFn({ - document: documentAST, - variables, - operationName, - result, - context, - }); + const extensions = await extensionsFn(result); if (extensions != null) { result = { ...result, extensions }; @@ -388,6 +418,31 @@ export function graphqlHTTP(options: Options): Middleware { errors: result.errors?.map(formatErrorFn), }; + if (isAsyncIterable(executeResult)) { + response.setHeader('Content-Type', 'multipart/mixed; boundary="-"'); + sendPartialResponse(pretty, response, formattedResult); + for await (let payload of executeResult) { + // Collect and apply any metadata extensions if a function was provided. + // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (extensionsFn) { + const extensions = await extensionsFn(payload); + + if (extensions != null) { + payload = { ...payload, extensions }; + } + } + const formattedPayload: FormattedExecutionPatchResult = { + // first payload is already consumed, all subsequent payloads typed as ExecutionPatchResult + ...(payload as ExecutionPatchResult), + errors: payload.errors?.map(formatErrorFn), + }; + sendPartialResponse(pretty, response, formattedPayload); + } + response.write('\r\n-----\r\n'); + response.end(); + return; + } + // If allowed to show GraphiQL, present it instead of JSON. if (showGraphiQL) { return respondWithGraphiQL( @@ -510,6 +565,36 @@ function canDisplayGraphiQL(request: Request, params: GraphQLParams): boolean { return !params.raw && accepts(request).types(['json', 'html']) === 'html'; } +/** + * Helper function for sending part of a multi-part response using only the core Node server APIs. + */ +function sendPartialResponse( + pretty: boolean, + response: Response, + result: FormattedExecutionResult | FormattedExecutionPatchResult, +): void { + const json = JSON.stringify(result, null, pretty ? 2 : 0); + const chunk = Buffer.from(json, 'utf8'); + const data = [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: ' + String(chunk.length), + '', + chunk, + '', + ].join('\r\n'); + response.write(data); + // flush response if compression middleware is used + if ( + typeof response.flush === 'function' && + // @ts-expect-error deprecated flush method is implemented on ServerResponse but not typed + response.flush !== ServerResponse.prototype.flush + ) { + response.flush(); + } +} + /** * Helper function for sending a response using only the core Node server APIs. */ @@ -519,3 +604,10 @@ function sendResponse(response: Response, type: string, data: string): void { response.setHeader('Content-Length', String(chunk.length)); response.end(chunk); } + +function getAsyncIterator( + asyncIterable: AsyncIterable, +): AsyncIterator { + const method = asyncIterable[Symbol.asyncIterator]; + return method.call(asyncIterable); +} diff --git a/src/isAsyncIterable.ts b/src/isAsyncIterable.ts new file mode 100644 index 00000000..7542281e --- /dev/null +++ b/src/isAsyncIterable.ts @@ -0,0 +1,9 @@ +export function isAsyncIterable( + maybeAsyncIterable: any, + // eslint-disable-next-line no-undef +): maybeAsyncIterable is AsyncIterable { + if (maybeAsyncIterable == null || typeof maybeAsyncIterable !== 'object') { + return false; + } + return typeof maybeAsyncIterable[Symbol.asyncIterator] === 'function'; +}