From a2c0ab45b4e5e81824d58d9944e91e5e2d33b019 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Sun, 4 Oct 2020 16:20:31 -0400 Subject: [PATCH 1/6] Support returning async iterables from resolver functions (#2712) Co-authored-by: Ivan Goncharov support async benchmark tests add benchmark tests for list fields add test for error from completeValue in AsyncIterable resolver change execute implementation for async iterable resolvers correctly handle promises returned by completeValue in async iterable resovlers --- benchmark/benchmark.js | 8 +- benchmark/list-async-benchmark.js | 28 +++++ benchmark/list-asyncIterable-benchmark.js | 26 +++++ benchmark/list-sync-benchmark.js | 28 +++++ src/execution/__tests__/lists-test.js | 122 ++++++++++++++++++++++ src/execution/execute.js | 86 ++++++++++++++- 6 files changed, 293 insertions(+), 5 deletions(-) create mode 100644 benchmark/list-async-benchmark.js create mode 100644 benchmark/list-asyncIterable-benchmark.js create mode 100644 benchmark/list-sync-benchmark.js diff --git a/benchmark/benchmark.js b/benchmark/benchmark.js index c8da30e1f6..4a8a530fe3 100644 --- a/benchmark/benchmark.js +++ b/benchmark/benchmark.js @@ -346,9 +346,9 @@ function sampleModule(modulePath) { clock(7, module.measure); // warm up global.gc(); - process.nextTick(() => { + process.nextTick(async () => { const memBaseline = process.memoryUsage().heapUsed; - const clocked = clock(module.count, module.measure); + const clocked = await clock(module.count, module.measure); process.send({ name: module.name, clocked: clocked / module.count, @@ -357,10 +357,10 @@ function sampleModule(modulePath) { }); // Clocks the time taken to execute a test per cycle (secs). - function clock(count, fn) { + async function clock(count, fn) { const start = process.hrtime.bigint(); for (let i = 0; i < count; ++i) { - fn(); + await fn(); } return Number(process.hrtime.bigint() - start); } diff --git a/benchmark/list-async-benchmark.js b/benchmark/list-async-benchmark.js new file mode 100644 index 0000000000..ef83b8174e --- /dev/null +++ b/benchmark/list-async-benchmark.js @@ -0,0 +1,28 @@ +'use strict'; + +const { parse } = require('graphql/language/parser.js'); +const { execute } = require('graphql/execution/execute.js'); +const { buildSchema } = require('graphql/utilities/buildASTSchema.js'); + +const schema = buildSchema('type Query { listField: [String] }'); +const document = parse('{ listField }'); + +function listField() { + const results = []; + for (let index = 0; index < 100000; index++) { + results.push(Promise.resolve(index)); + } + return results; +} + +module.exports = { + name: 'Execute Asynchronous List Field', + count: 10, + async measure() { + await execute({ + schema, + document, + rootValue: { listField }, + }); + }, +}; diff --git a/benchmark/list-asyncIterable-benchmark.js b/benchmark/list-asyncIterable-benchmark.js new file mode 100644 index 0000000000..3863cca833 --- /dev/null +++ b/benchmark/list-asyncIterable-benchmark.js @@ -0,0 +1,26 @@ +'use strict'; + +const { parse } = require('graphql/language/parser.js'); +const { execute } = require('graphql/execution/execute.js'); +const { buildSchema } = require('graphql/utilities/buildASTSchema.js'); + +const schema = buildSchema('type Query { listField: [String] }'); +const document = parse('{ listField }'); + +async function* listField() { + for (let index = 0; index < 100000; index++) { + yield index; + } +} + +module.exports = { + name: 'Execute Async Iterable List Field', + count: 10, + async measure() { + await execute({ + schema, + document, + rootValue: { listField }, + }); + }, +}; diff --git a/benchmark/list-sync-benchmark.js b/benchmark/list-sync-benchmark.js new file mode 100644 index 0000000000..172d87a967 --- /dev/null +++ b/benchmark/list-sync-benchmark.js @@ -0,0 +1,28 @@ +'use strict'; + +const { parse } = require('graphql/language/parser.js'); +const { execute } = require('graphql/execution/execute.js'); +const { buildSchema } = require('graphql/utilities/buildASTSchema.js'); + +const schema = buildSchema('type Query { listField: [String] }'); +const document = parse('{ listField }'); + +function listField() { + const results = []; + for (let index = 0; index < 100000; index++) { + results.push(index); + } + return results; +} + +module.exports = { + name: 'Execute Synchronous List Field', + count: 10, + async measure() { + await execute({ + schema, + document, + rootValue: { listField }, + }); + }, +}; diff --git a/src/execution/__tests__/lists-test.js b/src/execution/__tests__/lists-test.js index 926802a51b..5e996efbdf 100644 --- a/src/execution/__tests__/lists-test.js +++ b/src/execution/__tests__/lists-test.js @@ -2,6 +2,9 @@ import { expect } from 'chai'; import { describe, it } from 'mocha'; import { parse } from '../../language/parser'; +import { GraphQLList, GraphQLObjectType } from '../../type/definition'; +import { GraphQLString } from '../../type/scalars'; +import { GraphQLSchema } from '../../type/schema'; import { buildSchema } from '../../utilities/buildASTSchema'; @@ -64,6 +67,125 @@ describe('Execute: Accepts any iterable as list value', () => { }); }); +describe('Execute: Accepts async iterables as list value', () => { + function complete(rootValue: mixed) { + return execute({ + schema: buildSchema('type Query { listField: [String] }'), + document: parse('{ listField }'), + rootValue, + }); + } + + function completeObjectList(resolve) { + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + listField: { + resolve: async function* listField() { + yield await { index: 0 }; + yield await { index: 1 }; + yield await { index: 2 }; + }, + type: new GraphQLList( + new GraphQLObjectType({ + name: 'ObjectWrapper', + fields: { + index: { + type: GraphQLString, + resolve, + }, + }, + }), + ), + }, + }, + }), + }); + return execute({ + schema, + document: parse('{ listField { index } }'), + }); + } + + it('Accepts an AsyncGenerator function as a List value', async () => { + async function* listField() { + yield await 'two'; + yield await 4; + yield await false; + } + + expect(await complete({ listField })).to.deep.equal({ + data: { listField: ['two', '4', 'false'] }, + }); + }); + + it('Handles an AsyncGenerator function that throws', async () => { + async function* listField() { + yield await 'two'; + yield await 4; + throw new Error('bad'); + } + + expect(await complete({ listField })).to.deep.equal({ + data: { listField: ['two', '4', null] }, + errors: [ + { + message: 'bad', + locations: [{ line: 1, column: 3 }], + path: ['listField', 2], + }, + ], + }); + }); + + it('Handles errors from `completeValue` in AsyncIterables', async () => { + async function* listField() { + yield await 'two'; + yield await {}; + } + + expect(await complete({ listField })).to.deep.equal({ + data: { listField: ['two', null] }, + errors: [ + { + message: 'String cannot represent value: {}', + locations: [{ line: 1, column: 3 }], + path: ['listField', 1], + }, + ], + }); + }); + + it('Handles promises from `completeValue` in AsyncIterables', async () => { + expect( + await completeObjectList(({ index }) => Promise.resolve(index)), + ).to.deep.equal({ + data: { listField: [{ index: '0' }, { index: '1' }, { index: '2' }] }, + }); + }); + + it('Handles rejected promises from `completeValue` in AsyncIterables', async () => { + expect( + await completeObjectList(({ index }) => { + if (index === 2) { + return Promise.reject(new Error('bad')); + } + return Promise.resolve(index); + }), + ).to.deep.equal({ + data: { listField: [{ index: '0' }, { index: '1' }, { index: null }] }, + errors: [ + { + message: 'bad', + locations: [{ line: 1, column: 15 }], + path: ['listField', 2, 'index'], + }, + ], + }); + }); +}); + describe('Execute: Handles list nullability', () => { async function complete(args: {| listField: mixed, as: string |}) { const { listField, as } = args; diff --git a/src/execution/execute.js b/src/execution/execute.js index f272b65aef..193fc2afbc 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -1,4 +1,5 @@ import arrayFrom from '../polyfills/arrayFrom'; +import { SYMBOL_ASYNC_ITERATOR } from '../polyfills/symbols'; import type { Path } from '../jsutils/Path'; import type { ObjMap } from '../jsutils/ObjMap'; @@ -8,6 +9,7 @@ import memoize3 from '../jsutils/memoize3'; import invariant from '../jsutils/invariant'; import devAssert from '../jsutils/devAssert'; import isPromise from '../jsutils/isPromise'; +import isAsyncIterable from '../jsutils/isAsyncIterable'; import isObjectLike from '../jsutils/isObjectLike'; import isCollection from '../jsutils/isCollection'; import promiseReduce from '../jsutils/promiseReduce'; @@ -855,6 +857,74 @@ function completeValue( ); } +/** + * Complete a async iterator value by completing the result and calling + * recursively until all the results are completed. + */ +function completeAsyncIteratorValue( + exeContext: ExecutionContext, + itemType: GraphQLOutputType, + fieldNodes: $ReadOnlyArray, + info: GraphQLResolveInfo, + path: Path, + iterator: AsyncIterator, +): Promise<$ReadOnlyArray> { + let containsPromise = false; + return new Promise((resolve) => { + function next(index, completedResults) { + const fieldPath = addPath(path, index, undefined); + iterator.next().then( + ({ value, done }) => { + if (done) { + resolve(completedResults); + return; + } + // TODO can the error checking logic be consolidated with completeListValue? + try { + const completedItem = completeValue( + exeContext, + itemType, + fieldNodes, + info, + fieldPath, + value, + ); + if (isPromise(completedItem)) { + containsPromise = true; + } + completedResults.push(completedItem); + } catch (rawError) { + completedResults.push(null); + const error = locatedError( + rawError, + fieldNodes, + pathToArray(fieldPath), + ); + handleFieldError(error, itemType, exeContext); + resolve(completedResults); + return; + } + + next(index + 1, completedResults); + }, + (rawError) => { + completedResults.push(null); + const error = locatedError( + rawError, + fieldNodes, + pathToArray(fieldPath), + ); + handleFieldError(error, itemType, exeContext); + resolve(completedResults); + }, + ); + } + next(0, []); + }).then((completedResults) => + containsPromise ? Promise.all(completedResults) : completedResults, + ); +} + /** * Complete a list value by completing each item in the list with the * inner type @@ -867,6 +937,21 @@ function completeListValue( path: Path, result: mixed, ): PromiseOrValue<$ReadOnlyArray> { + const itemType = returnType.ofType; + + if (isAsyncIterable(result)) { + const iterator = result[SYMBOL_ASYNC_ITERATOR](); + + return completeAsyncIteratorValue( + exeContext, + itemType, + fieldNodes, + info, + path, + iterator, + ); + } + if (!isCollection(result)) { throw new GraphQLError( `Expected Iterable, but did not find one for field "${info.parentType.name}.${info.fieldName}".`, @@ -875,7 +960,6 @@ function completeListValue( // This is specified as a simple map, however we're optimizing the path // where the list contains no Promises by avoiding creating another Promise. - const itemType = returnType.ofType; let containsPromise = false; const completedResults = arrayFrom(result, (item, index) => { // No need to modify the info object containing the path, From 80aa9203253c5fa3045c358eed761ba1a1b8f54a Mon Sep 17 00:00:00 2001 From: Liliana Matos Date: Wed, 14 Oct 2020 16:15:21 -0400 Subject: [PATCH 2/6] Add @defer directive to specified directives --- src/index.js | 1 + src/type/__tests__/introspection-test.js | 25 +++++++++++++++++++ src/type/directives.d.ts | 5 ++++ src/type/directives.js | 24 ++++++++++++++++++ src/type/index.d.ts | 1 + src/type/index.js | 1 + .../__tests__/buildASTSchema-test.js | 15 +++++++---- .../__tests__/findBreakingChanges-test.js | 2 ++ src/utilities/__tests__/printSchema-test.js | 20 +++++++++++++++ 9 files changed, 89 insertions(+), 5 deletions(-) diff --git a/src/index.js b/src/index.js index 104ab88658..9319e99f73 100644 --- a/src/index.js +++ b/src/index.js @@ -53,6 +53,7 @@ export { specifiedDirectives, GraphQLIncludeDirective, GraphQLSkipDirective, + GraphQLDeferDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // "Enum" of Type Kinds diff --git a/src/type/__tests__/introspection-test.js b/src/type/__tests__/introspection-test.js index 478cc9bd18..252df62fd1 100644 --- a/src/type/__tests__/introspection-test.js +++ b/src/type/__tests__/introspection-test.js @@ -936,6 +936,31 @@ describe('Introspection', () => { }, ], }, + { + name: 'defer', + isRepeatable: false, + locations: ['FRAGMENT_SPREAD', 'INLINE_FRAGMENT'], + args: [ + { + defaultValue: null, + name: 'if', + type: { + kind: 'SCALAR', + name: 'Boolean', + ofType: null, + }, + }, + { + defaultValue: null, + name: 'label', + type: { + kind: 'SCALAR', + name: 'String', + ofType: null, + }, + }, + ], + }, { name: 'deprecated', isRepeatable: false, diff --git a/src/type/directives.d.ts b/src/type/directives.d.ts index 2c6de77b1d..6b987df49b 100644 --- a/src/type/directives.d.ts +++ b/src/type/directives.d.ts @@ -73,6 +73,11 @@ export const GraphQLIncludeDirective: GraphQLDirective; */ export const GraphQLSkipDirective: GraphQLDirective; +/** + * Used to conditionally defer fragments. + */ +export const GraphQLDeferDirective: GraphQLDirective; + /** * Used to provide a URL for specifying the behavior of custom scalar definitions. */ diff --git a/src/type/directives.js b/src/type/directives.js index ff4cce6dd2..a500a7a88a 100644 --- a/src/type/directives.js +++ b/src/type/directives.js @@ -168,6 +168,29 @@ export const GraphQLSkipDirective = new GraphQLDirective({ }, }); +/** + * Used to conditionally defer fragments. + */ +export const GraphQLDeferDirective = new GraphQLDirective({ + name: 'defer', + description: + 'Directs the executor to defer this fragment when the `if` argument is true or undefined.', + locations: [ + DirectiveLocation.FRAGMENT_SPREAD, + DirectiveLocation.INLINE_FRAGMENT, + ], + args: { + if: { + type: GraphQLBoolean, + description: 'Deferred when true or undefined.', + }, + label: { + type: GraphQLString, + description: 'Unique name', + }, + }, +}); + /** * Constant string used for default reason for a deprecation. */ @@ -216,6 +239,7 @@ export const GraphQLSpecifiedByDirective = new GraphQLDirective({ export const specifiedDirectives = Object.freeze([ GraphQLIncludeDirective, GraphQLSkipDirective, + GraphQLDeferDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, ]); diff --git a/src/type/index.d.ts b/src/type/index.d.ts index 9686f413b7..d57144a310 100644 --- a/src/type/index.d.ts +++ b/src/type/index.d.ts @@ -125,6 +125,7 @@ export { specifiedDirectives, GraphQLIncludeDirective, GraphQLSkipDirective, + GraphQLDeferDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // Constant Deprecation Reason diff --git a/src/type/index.js b/src/type/index.js index 811d50247a..9cea8a7580 100644 --- a/src/type/index.js +++ b/src/type/index.js @@ -76,6 +76,7 @@ export { specifiedDirectives, GraphQLIncludeDirective, GraphQLSkipDirective, + GraphQLDeferDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // Constant Deprecation Reason diff --git a/src/utilities/__tests__/buildASTSchema-test.js b/src/utilities/__tests__/buildASTSchema-test.js index f364e311dd..c70c39d676 100644 --- a/src/utilities/__tests__/buildASTSchema-test.js +++ b/src/utilities/__tests__/buildASTSchema-test.js @@ -20,6 +20,7 @@ import { GraphQLIncludeDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, + GraphQLDeferDirective, } from '../../type/directives'; import { GraphQLID, @@ -251,12 +252,13 @@ describe('Schema Builder', () => { expect(cycleSDL(sdl, { commentDescriptions: true })).to.equal(sdl); }); - it('Maintains @include, @skip & @specifiedBy', () => { + it('Maintains specified directives', () => { const schema = buildSchema('type Query'); - expect(schema.getDirectives()).to.have.lengthOf(4); + expect(schema.getDirectives()).to.have.lengthOf(5); expect(schema.getDirective('skip')).to.equal(GraphQLSkipDirective); expect(schema.getDirective('include')).to.equal(GraphQLIncludeDirective); + expect(schema.getDirective('defer')).to.equal(GraphQLDeferDirective); expect(schema.getDirective('deprecated')).to.equal( GraphQLDeprecatedDirective, ); @@ -271,9 +273,10 @@ describe('Schema Builder', () => { directive @include on FIELD directive @deprecated on FIELD_DEFINITION directive @specifiedBy on FIELD_DEFINITION + directive @defer on FRAGMENT_SPREAD `); - expect(schema.getDirectives()).to.have.lengthOf(4); + expect(schema.getDirectives()).to.have.lengthOf(5); expect(schema.getDirective('skip')).to.not.equal(GraphQLSkipDirective); expect(schema.getDirective('include')).to.not.equal( GraphQLIncludeDirective, @@ -284,16 +287,18 @@ describe('Schema Builder', () => { expect(schema.getDirective('specifiedBy')).to.not.equal( GraphQLSpecifiedByDirective, ); + expect(schema.getDirective('defer')).to.not.equal(GraphQLDeferDirective); }); - it('Adding directives maintains @include, @skip & @specifiedBy', () => { + it('Adding directives maintains specified directives', () => { const schema = buildSchema(` directive @foo(arg: Int) on FIELD `); - expect(schema.getDirectives()).to.have.lengthOf(5); + expect(schema.getDirectives()).to.have.lengthOf(6); expect(schema.getDirective('skip')).to.not.equal(undefined); expect(schema.getDirective('include')).to.not.equal(undefined); + expect(schema.getDirective('defer')).to.not.equal(undefined); expect(schema.getDirective('deprecated')).to.not.equal(undefined); expect(schema.getDirective('specifiedBy')).to.not.equal(undefined); }); diff --git a/src/utilities/__tests__/findBreakingChanges-test.js b/src/utilities/__tests__/findBreakingChanges-test.js index a4ab722084..461d5d0c8c 100644 --- a/src/utilities/__tests__/findBreakingChanges-test.js +++ b/src/utilities/__tests__/findBreakingChanges-test.js @@ -4,6 +4,7 @@ import { describe, it } from 'mocha'; import { GraphQLSchema } from '../../type/schema'; import { GraphQLSkipDirective, + GraphQLDeferDirective, GraphQLIncludeDirective, GraphQLSpecifiedByDirective, GraphQLDeprecatedDirective, @@ -802,6 +803,7 @@ describe('findBreakingChanges', () => { GraphQLSkipDirective, GraphQLIncludeDirective, GraphQLSpecifiedByDirective, + GraphQLDeferDirective, ], }); diff --git a/src/utilities/__tests__/printSchema-test.js b/src/utilities/__tests__/printSchema-test.js index df064c3724..2c1f1c759c 100644 --- a/src/utilities/__tests__/printSchema-test.js +++ b/src/utilities/__tests__/printSchema-test.js @@ -627,6 +627,17 @@ describe('Type System Printer', () => { if: Boolean! ) on FIELD | FRAGMENT_SPREAD | INLINE_FRAGMENT + """ + Directs the executor to defer this fragment when the \`if\` argument is true or undefined. + """ + directive @defer( + """Deferred when true or undefined.""" + if: Boolean + + """Unique name""" + label: String + ) on FRAGMENT_SPREAD | INLINE_FRAGMENT + """Marks an element of a GraphQL schema as no longer supported.""" directive @deprecated( """ @@ -852,6 +863,15 @@ describe('Type System Printer', () => { if: Boolean! ) on FIELD | FRAGMENT_SPREAD | INLINE_FRAGMENT + # Directs the executor to defer this fragment when the \`if\` argument is true or undefined. + directive @defer( + # Deferred when true or undefined. + if: Boolean + + # Unique name + label: String + ) on FRAGMENT_SPREAD | INLINE_FRAGMENT + # Marks an element of a GraphQL schema as no longer supported. directive @deprecated( # Explains why this element was deprecated, usually also including a suggestion for how to access supported similar data. Formatted using the Markdown syntax, as specified by [CommonMark](https://commonmark.org/). From 71f3d57068a8103e6d83a02ac806eeaba0d95042 Mon Sep 17 00:00:00 2001 From: Liliana Matos Date: Sat, 24 Oct 2020 11:59:18 -0400 Subject: [PATCH 3/6] Implement support for @defer directive --- src/execution/__tests__/defer-test.js | 270 ++++++++++++++ src/execution/__tests__/mutations-test.js | 129 +++++++ src/execution/__tests__/sync-test.js | 18 + src/execution/execute.d.ts | 48 ++- src/execution/execute.js | 436 +++++++++++++++++++--- src/execution/index.d.ts | 3 + src/execution/index.js | 3 + src/graphql.d.ts | 8 +- src/graphql.js | 16 +- src/index.d.ts | 3 + src/index.js | 3 + src/subscription/subscribe.js | 15 +- 12 files changed, 893 insertions(+), 59 deletions(-) create mode 100644 src/execution/__tests__/defer-test.js diff --git a/src/execution/__tests__/defer-test.js b/src/execution/__tests__/defer-test.js new file mode 100644 index 0000000000..d7ed6f420b --- /dev/null +++ b/src/execution/__tests__/defer-test.js @@ -0,0 +1,270 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import isAsyncIterable from '../../jsutils/isAsyncIterable'; +import { parse } from '../../language/parser'; + +import { GraphQLID, GraphQLString } from '../../type/scalars'; +import { GraphQLSchema } from '../../type/schema'; +import { GraphQLObjectType, GraphQLList } from '../../type/definition'; + +import { execute } from '../execute'; + +const friendType = new GraphQLObjectType({ + fields: { + id: { type: GraphQLID }, + name: { type: GraphQLString }, + }, + name: 'Friend', +}); + +const friends = [ + { name: 'Han', id: 2 }, + { name: 'Leia', id: 3 }, + { name: 'C-3PO', id: 4 }, +]; + +const heroType = new GraphQLObjectType({ + fields: { + id: { type: GraphQLID }, + name: { type: GraphQLString }, + errorField: { + type: GraphQLString, + resolve: () => { + throw new Error('bad'); + }, + }, + friends: { + type: new GraphQLList(friendType), + resolve: () => friends, + }, + }, + name: 'Hero', +}); + +const hero = { name: 'Luke', id: 1 }; + +const query = new GraphQLObjectType({ + fields: { + hero: { + type: heroType, + resolve: () => hero, + }, + }, + name: 'Query', +}); + +async function complete(document) { + const schema = new GraphQLSchema({ query }); + + const result = await execute({ + schema, + document, + rootValue: {}, + }); + + if (isAsyncIterable(result)) { + const results = []; + for await (const patch of result) { + results.push(patch); + } + return results; + } + return result; +} + +describe('Execute: defer directive', () => { + it('Can defer fragments containing scalar types', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + id + name + } + `); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { + hero: { + id: '1', + }, + }, + hasNext: true, + }, + { + data: { + id: '1', + name: 'Luke', + }, + path: ['hero'], + hasNext: false, + }, + ]); + }); + it('Can disable defer using if argument', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer(if: false) + } + } + fragment NameFragment on Hero { + name + } + `); + const result = await complete(document); + + expect(result).to.deep.equal({ + data: { + hero: { + id: '1', + name: 'Luke', + }, + }, + }); + }); + it('Can defer fragments containing on the top level Query field', async () => { + const document = parse(` + query HeroNameQuery { + ...QueryFragment @defer(label: "DeferQuery") + } + fragment QueryFragment on Query { + hero { + id + } + } + `); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: {}, + hasNext: true, + }, + { + data: { + hero: { + id: '1', + }, + }, + path: [], + label: 'DeferQuery', + hasNext: false, + }, + ]); + }); + it('Can defer a fragment within an already deferred fragment', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...TopFragment @defer(label: "DeferTop") + } + } + fragment TopFragment on Hero { + name + ...NestedFragment @defer(label: "DeferNested") + } + fragment NestedFragment on Hero { + friends { + name + } + } + `); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { + hero: { + id: '1', + }, + }, + hasNext: true, + }, + { + data: { + friends: [{ name: 'Han' }, { name: 'Leia' }, { name: 'C-3PO' }], + }, + path: ['hero'], + label: 'DeferNested', + hasNext: true, + }, + { + data: { + name: 'Luke', + }, + path: ['hero'], + label: 'DeferTop', + hasNext: false, + }, + ]); + }); + it('Can defer an inline fragment', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ... on Hero @defer(label: "InlineDeferred") { + name + } + } + } + `); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { hero: { id: '1' } }, + hasNext: true, + }, + { + data: { name: 'Luke' }, + path: ['hero'], + label: 'InlineDeferred', + hasNext: false, + }, + ]); + }); + it('Handles errors thrown in deferred fragments', async () => { + const document = parse(` + query HeroNameQuery { + hero { + id + ...NameFragment @defer + } + } + fragment NameFragment on Hero { + errorField + } + `); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { hero: { id: '1' } }, + hasNext: true, + }, + { + data: { errorField: null }, + path: ['hero'], + errors: [ + { + message: 'bad', + locations: [{ line: 9, column: 9 }], + path: ['hero', 'errorField'], + }, + ], + hasNext: false, + }, + ]); + }); +}); diff --git a/src/execution/__tests__/mutations-test.js b/src/execution/__tests__/mutations-test.js index c9c51296bf..97577f0d66 100644 --- a/src/execution/__tests__/mutations-test.js +++ b/src/execution/__tests__/mutations-test.js @@ -3,6 +3,8 @@ import { describe, it } from 'mocha'; import resolveOnNextTick from '../../__testUtils__/resolveOnNextTick'; +import invariant from '../../jsutils/invariant'; +import isAsyncIterable from '../../jsutils/isAsyncIterable'; import { parse } from '../../language/parser'; import { GraphQLInt } from '../../type/scalars'; @@ -49,6 +51,15 @@ class Root { const numberHolderType = new GraphQLObjectType({ fields: { theNumber: { type: GraphQLInt }, + promiseToGetTheNumber: { + type: GraphQLInt, + resolve: (root) => + new Promise((resolve) => { + process.nextTick(() => { + resolve(root.theNumber); + }); + }), + }, }, name: 'NumberHolder', }); @@ -190,4 +201,122 @@ describe('Execute: Handles mutation execution ordering', () => { ], }); }); + it('Mutation fields with @defer do not block next mutation', async () => { + const document = parse(` + mutation M { + first: promiseToChangeTheNumber(newNumber: 1) { + ...DeferFragment @defer(label: "defer-label") + }, + second: immediatelyChangeTheNumber(newNumber: 2) { + theNumber + } + } + fragment DeferFragment on NumberHolder { + promiseToGetTheNumber + } + `); + + const rootValue = new Root(6); + const mutationResult = await execute({ + schema, + document, + rootValue, + }); + const patches = []; + + invariant(isAsyncIterable(mutationResult)); + for await (const patch of mutationResult) { + patches.push(patch); + } + + expect(patches).to.deep.equal([ + { + data: { + first: {}, + second: { theNumber: 2 }, + }, + hasNext: true, + }, + { + label: 'defer-label', + path: ['first'], + data: { + promiseToGetTheNumber: 2, + }, + hasNext: false, + }, + ]); + }); + it('Mutation inside of a fragment', async () => { + const document = parse(` + mutation M { + ...MutationFragment + second: immediatelyChangeTheNumber(newNumber: 2) { + theNumber + } + } + fragment MutationFragment on Mutation { + first: promiseToChangeTheNumber(newNumber: 1) { + theNumber + }, + } + `); + + const rootValue = new Root(6); + const mutationResult = await execute({ schema, document, rootValue }); + + expect(mutationResult).to.deep.equal({ + data: { + first: { theNumber: 1 }, + second: { theNumber: 2 }, + }, + }); + }); + it('Mutation with @defer is not executed serially', async () => { + const document = parse(` + mutation M { + ...MutationFragment @defer(label: "defer-label") + second: immediatelyChangeTheNumber(newNumber: 2) { + theNumber + } + } + fragment MutationFragment on Mutation { + first: promiseToChangeTheNumber(newNumber: 1) { + theNumber + }, + } + `); + + const rootValue = new Root(6); + const mutationResult = await execute({ + schema, + document, + rootValue, + }); + const patches = []; + + invariant(isAsyncIterable(mutationResult)); + for await (const patch of mutationResult) { + patches.push(patch); + } + + expect(patches).to.deep.equal([ + { + data: { + second: { theNumber: 2 }, + }, + hasNext: true, + }, + { + label: 'defer-label', + path: [], + data: { + first: { + theNumber: 1, + }, + }, + hasNext: false, + }, + ]); + }); }); diff --git a/src/execution/__tests__/sync-test.js b/src/execution/__tests__/sync-test.js index 184a259b69..2619331734 100644 --- a/src/execution/__tests__/sync-test.js +++ b/src/execution/__tests__/sync-test.js @@ -111,6 +111,24 @@ describe('Execute: synchronously when possible', () => { }); }).to.throw('GraphQL execution failed to complete synchronously.'); }); + + it('throws if encountering async iterable execution', () => { + const doc = ` + query Example { + ...deferFrag @defer(label: "deferLabel") + } + fragment deferFrag on Query { + syncField + } + `; + expect(() => { + executeSync({ + schema, + document: parse(doc), + rootValue: 'rootValue', + }); + }).to.throw('GraphQL execution failed to complete synchronously.'); + }); }); describe('graphqlSync', () => { diff --git a/src/execution/execute.d.ts b/src/execution/execute.d.ts index a20db8c224..0f7fc5936e 100644 --- a/src/execution/execute.d.ts +++ b/src/execution/execute.d.ts @@ -44,6 +44,7 @@ export interface ExecutionContext { * * - `errors` is included when any errors occurred as a non-empty array. * - `data` is the result of a successful execution of the query. + * - `hasNext` is true if a future payload is expected. * - `extensions` is reserved for adding non-standard properties. */ export interface ExecutionResult< @@ -53,6 +54,7 @@ export interface ExecutionResult< errors?: ReadonlyArray; // TS_SPECIFIC: TData. Motivation: https://github.com/graphql/graphql-js/pull/2490#issuecomment-639154229 data?: TData | null; + hasNext?: boolean; extensions?: TExtensions; } @@ -66,6 +68,42 @@ export interface FormattedExecutionResult< extensions?: TExtensions; } +/** + * The result of an asynchronous GraphQL patch. + * + * - `errors` is included when any errors occurred as a non-empty array. + * - `data` is the result of the additional asynchronous data. + * - `path` is the location of data. + * - `hasNext` is true if a future payload is expected. + * - `label` is the label provided to @defer or @stream. + * - `extensions` is reserved for adding non-standard properties. + */ +export interface ExecutionPatchResult< + TData = { [key: string]: any }, + TExtensions = { [key: string]: any } +> { + errors?: ReadonlyArray; + data?: TData | null; + path?: ReadonlyArray; + label?: string; + hasNext: boolean; + extensions?: TExtensions; +} + +export interface FormattedExecutionPatchResult< + TData = { [key: string]: any }, + TExtensions = { [key: string]: any } +> { + errors?: ReadonlyArray; + data?: TData | null; + path?: ReadonlyArray; + label?: string; + hasNext: boolean; + extensions?: TExtensions; +} + +export type AsyncExecutionResult = ExecutionResult | ExecutionPatchResult; + export interface ExecutionArgs { schema: GraphQLSchema; document: DocumentNode; @@ -89,7 +127,11 @@ export interface ExecutionArgs { * * Accepts either an object with named arguments, or individual arguments. */ -export function execute(args: ExecutionArgs): PromiseOrValue; +export function execute( + args: ExecutionArgs, +): PromiseOrValue< + ExecutionResult | AsyncIterableIterator +>; export function execute( schema: GraphQLSchema, document: DocumentNode, @@ -99,7 +141,9 @@ export function execute( operationName?: Maybe, fieldResolver?: Maybe>, typeResolver?: Maybe>, -): PromiseOrValue; +): PromiseOrValue< + ExecutionResult | AsyncIterableIterator +>; /** * Also implements the "Evaluating requests" section of the GraphQL specification. diff --git a/src/execution/execute.js b/src/execution/execute.js index 193fc2afbc..4523b1df7c 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -52,6 +52,7 @@ import { import { GraphQLIncludeDirective, GraphQLSkipDirective, + GraphQLDeferDirective, } from '../type/directives'; import { isNamedType, @@ -107,6 +108,7 @@ export type ExecutionContext = {| fieldResolver: GraphQLFieldResolver, typeResolver: GraphQLTypeResolver, errors: Array, + dispatcher: Dispatcher, |}; /** @@ -114,20 +116,53 @@ export type ExecutionContext = {| * * - `errors` is included when any errors occurred as a non-empty array. * - `data` is the result of a successful execution of the query. + * - `hasNext` is true if a future payload is expected. * - `extensions` is reserved for adding non-standard properties. */ export type ExecutionResult = {| errors?: $ReadOnlyArray, data?: ObjMap | null, + hasNext?: boolean, extensions?: ObjMap, |}; export type FormattedExecutionResult = {| errors?: $ReadOnlyArray, data?: ObjMap | null, + hasNext?: boolean, extensions?: ObjMap, |}; +/** + * The result of an asynchronous GraphQL patch. + * + * - `errors` is included when any errors occurred as a non-empty array. + * - `data` is the result of the additional asynchronous data. + * - `path` is the location of data. + * - `label` is the label provided to @defer or @stream. + * - `hasNext` is true if a future payload is expected. + * - `extensions` is reserved for adding non-standard properties. + */ +export type ExecutionPatchResult = {| + errors?: $ReadOnlyArray, + data?: ObjMap | mixed | null, + path?: $ReadOnlyArray, + label?: string, + hasNext: boolean, + extensions?: ObjMap, +|}; + +export type FormattedExecutionPatchResult = {| + errors?: $ReadOnlyArray, + data?: ObjMap | mixed | null, + path?: $ReadOnlyArray, + label?: string, + hasNext: boolean, + extensions?: ObjMap, +|}; + +export type AsyncExecutionResult = ExecutionResult | ExecutionPatchResult; + export type ExecutionArgs = {| schema: GraphQLSchema, document: DocumentNode, @@ -139,6 +174,12 @@ export type ExecutionArgs = {| typeResolver?: ?GraphQLTypeResolver, |}; +export type FieldsAndPatches = { + fields: ObjMap>, + patches: Array<{| label?: string, fields: ObjMap> |}>, + ... +}; + /** * Implements the "Evaluating requests" section of the GraphQL specification. * @@ -201,14 +242,17 @@ export function executeSync(args: ExecutionArgs): ExecutionResult { const result = executeImpl(args); // Assert that the execution was synchronous. - if (isPromise(result)) { + if (isPromise(result) || isAsyncIterable(result)) { throw new Error('GraphQL execution failed to complete synchronously.'); } - return result; + // Note: Flow can't refine isAsyncIterable, so explicit casts are used. + return ((result: any): ExecutionResult); } -function executeImpl(args: ExecutionArgs): PromiseOrValue { +function executeImpl( + args: ExecutionArgs, +): PromiseOrValue> { const { schema, document, @@ -259,13 +303,21 @@ function executeImpl(args: ExecutionArgs): PromiseOrValue { function buildResponse( exeContext: ExecutionContext, data: PromiseOrValue | null>, -): PromiseOrValue { +): PromiseOrValue> { if (isPromise(data)) { return data.then((resolved) => buildResponse(exeContext, resolved)); } - return exeContext.errors.length === 0 - ? { data } - : { errors: exeContext.errors, data }; + + const initialResult = + exeContext.errors.length === 0 + ? { data } + : { errors: exeContext.errors, data }; + + if (exeContext.dispatcher.hasSubsequentPayloads()) { + return exeContext.dispatcher.get(initialResult); + } + + return initialResult; } /** @@ -363,6 +415,7 @@ export function buildExecutionContext( variableValues: coercedVariableValues.coerced, fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, + dispatcher: new Dispatcher(), errors: [], }; } @@ -370,17 +423,18 @@ export function buildExecutionContext( /** * Implements the "Evaluating operations" section of the spec. */ -function executeOperation( +export function executeOperation( exeContext: ExecutionContext, operation: OperationDefinitionNode, rootValue: mixed, ): PromiseOrValue | null> { const type = getOperationRootType(exeContext.schema, operation); - const fields = collectFields( + const { fields, patches } = collectFields( exeContext, type, operation.selectionSet, Object.create(null), + [], Object.create(null), ); @@ -390,10 +444,33 @@ function executeOperation( // at which point we still log the error and null the parent field, which // in this case is the entire response. try { - const result = - operation.operation === 'mutation' - ? executeFieldsSerially(exeContext, type, rootValue, path, fields) - : executeFields(exeContext, type, rootValue, path, fields); + let result; + + if (operation.operation === 'mutation') { + result = executeFieldsSerially(exeContext, type, rootValue, path, fields); + } else { + result = executeFields( + exeContext, + type, + rootValue, + path, + fields, + exeContext.errors, + ); + } + + for (const patch of patches) { + const { label, fields: patchFields } = patch; + const errors = []; + + exeContext.dispatcher.addFields( + label, + path, + executeFields(exeContext, type, rootValue, path, patchFields, errors), + errors, + ); + } + if (isPromise(result)) { return result.then(undefined, (error) => { exeContext.errors.push(error); @@ -429,6 +506,7 @@ function executeFieldsSerially( sourceValue, fieldNodes, fieldPath, + exeContext.errors, ); if (result === undefined) { return results; @@ -456,6 +534,7 @@ function executeFields( sourceValue: mixed, path: Path | void, fields: ObjMap>, + errors: Array, ): PromiseOrValue> { const results = Object.create(null); let containsPromise = false; @@ -469,6 +548,7 @@ function executeFields( sourceValue, fieldNodes, fieldPath, + errors, ); if (result !== undefined) { @@ -505,8 +585,9 @@ export function collectFields( runtimeType: GraphQLObjectType, selectionSet: SelectionSetNode, fields: ObjMap>, + patches: Array<{| label?: string, fields: ObjMap> |}>, visitedFragmentNames: ObjMap, -): ObjMap> { +): FieldsAndPatches { for (const selection of selectionSet.selections) { switch (selection.kind) { case Kind.FIELD: { @@ -527,20 +608,47 @@ export function collectFields( ) { continue; } - collectFields( - exeContext, - runtimeType, - selection.selectionSet, - fields, - visitedFragmentNames, - ); + + const defer = getDeferValues(exeContext, selection); + + if (defer) { + const { fields: patchFields } = collectFields( + exeContext, + runtimeType, + selection.selectionSet, + Object.create(null), + patches, + visitedFragmentNames, + ); + patches.push({ + label: defer.label, + fields: patchFields, + }); + } else { + collectFields( + exeContext, + runtimeType, + selection.selectionSet, + fields, + patches, + visitedFragmentNames, + ); + } break; } case Kind.FRAGMENT_SPREAD: { const fragName = selection.name.value; + + if (!shouldIncludeNode(exeContext, selection)) { + continue; + } + + const defer = getDeferValues(exeContext, selection); + if ( - visitedFragmentNames[fragName] || - !shouldIncludeNode(exeContext, selection) + visitedFragmentNames[fragName] && + // Cannot continue in this case because fields must be recollected for patch + !defer ) { continue; } @@ -552,18 +660,36 @@ export function collectFields( ) { continue; } - collectFields( - exeContext, - runtimeType, - fragment.selectionSet, - fields, - visitedFragmentNames, - ); + + if (defer) { + const { fields: patchFields } = collectFields( + exeContext, + runtimeType, + fragment.selectionSet, + Object.create(null), + patches, + visitedFragmentNames, + ); + patches.push({ + label: defer.label, + fields: patchFields, + }); + } else { + collectFields( + exeContext, + runtimeType, + fragment.selectionSet, + fields, + patches, + visitedFragmentNames, + ); + } + break; } } } - return fields; + return { fields, patches }; } /** @@ -594,6 +720,34 @@ function shouldIncludeNode( return true; } +/** + * Returns an object containing the @defer arguments if a field should be + * deferred based on the experimental flag, defer directive present and + * not disabled by the "if" argument. + */ +function getDeferValues( + exeContext: ExecutionContext, + node: FragmentSpreadNode | InlineFragmentNode, +): void | {| label?: string |} { + const defer = getDirectiveValues( + GraphQLDeferDirective, + node, + exeContext.variableValues, + ); + + if (!defer) { + return; + } + + if (defer.if === false) { + return; + } + + return { + label: typeof defer.label === 'string' ? defer.label : undefined, + }; +} + /** * Determines if a fragment is applicable to the given type. */ @@ -635,6 +789,7 @@ function resolveField( source: mixed, fieldNodes: $ReadOnlyArray, path: Path, + errors: Array, ): PromiseOrValue { const fieldNode = fieldNodes[0]; const fieldName = fieldNode.name.value; @@ -676,7 +831,15 @@ function resolveField( let completed; if (isPromise(result)) { completed = result.then((resolved) => - completeValue(exeContext, returnType, fieldNodes, info, path, resolved), + completeValue( + exeContext, + returnType, + fieldNodes, + info, + path, + resolved, + errors, + ), ); } else { completed = completeValue( @@ -686,6 +849,7 @@ function resolveField( info, path, result, + errors, ); } @@ -694,13 +858,13 @@ function resolveField( // to take a second callback for the error case. return completed.then(undefined, (rawError) => { const error = locatedError(rawError, fieldNodes, pathToArray(path)); - return handleFieldError(error, returnType, exeContext); + return handleFieldError(error, returnType, errors); }); } return completed; } catch (rawError) { const error = locatedError(rawError, fieldNodes, pathToArray(path)); - return handleFieldError(error, returnType, exeContext); + return handleFieldError(error, returnType, errors); } } @@ -733,8 +897,8 @@ export function buildResolveInfo( function handleFieldError( error: GraphQLError, returnType: GraphQLOutputType, - exeContext: ExecutionContext, -): null { + errors: Array, +) { // If the field type is non-nullable, then it is resolved without any // protection from errors, however it still properly locates the error. if (isNonNullType(returnType)) { @@ -743,7 +907,7 @@ function handleFieldError( // Otherwise, error protection is applied, logging the error and resolving // a null value for this field if one is encountered. - exeContext.errors.push(error); + errors.push(error); return null; } @@ -775,6 +939,7 @@ function completeValue( info: GraphQLResolveInfo, path: Path, result: mixed, + errors: Array, ): PromiseOrValue { // If result is an Error, throw a located error. if (result instanceof Error) { @@ -791,6 +956,7 @@ function completeValue( info, path, result, + errors, ); if (completed === null) { throw new Error( @@ -814,6 +980,7 @@ function completeValue( info, path, result, + errors, ); } @@ -833,6 +1000,7 @@ function completeValue( info, path, result, + errors, ); } @@ -846,6 +1014,7 @@ function completeValue( info, path, result, + errors, ); } @@ -868,6 +1037,7 @@ function completeAsyncIteratorValue( info: GraphQLResolveInfo, path: Path, iterator: AsyncIterator, + errors: Array, ): Promise<$ReadOnlyArray> { let containsPromise = false; return new Promise((resolve) => { @@ -888,6 +1058,7 @@ function completeAsyncIteratorValue( info, fieldPath, value, + errors, ); if (isPromise(completedItem)) { containsPromise = true; @@ -900,7 +1071,7 @@ function completeAsyncIteratorValue( fieldNodes, pathToArray(fieldPath), ); - handleFieldError(error, itemType, exeContext); + handleFieldError(error, itemType, errors); resolve(completedResults); return; } @@ -914,7 +1085,7 @@ function completeAsyncIteratorValue( fieldNodes, pathToArray(fieldPath), ); - handleFieldError(error, itemType, exeContext); + handleFieldError(error, itemType, errors); resolve(completedResults); }, ); @@ -936,6 +1107,7 @@ function completeListValue( info: GraphQLResolveInfo, path: Path, result: mixed, + errors: Array, ): PromiseOrValue<$ReadOnlyArray> { const itemType = returnType.ofType; @@ -949,6 +1121,7 @@ function completeListValue( info, path, iterator, + errors, ); } @@ -976,6 +1149,7 @@ function completeListValue( info, itemPath, resolved, + errors, ), ); } else { @@ -986,6 +1160,7 @@ function completeListValue( info, itemPath, item, + errors, ); } @@ -999,13 +1174,13 @@ function completeListValue( fieldNodes, pathToArray(itemPath), ); - return handleFieldError(error, itemType, exeContext); + return handleFieldError(error, itemType, errors); }); } return completedItem; } catch (rawError) { const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); - return handleFieldError(error, itemType, exeContext); + return handleFieldError(error, itemType, errors); } }); @@ -1038,6 +1213,7 @@ function completeAbstractValue( info: GraphQLResolveInfo, path: Path, result: mixed, + errors: Array, ): PromiseOrValue> { const resolveTypeFn = returnType.resolveType ?? exeContext.typeResolver; const contextValue = exeContext.contextValue; @@ -1059,6 +1235,7 @@ function completeAbstractValue( info, path, result, + errors, ), ); } @@ -1077,6 +1254,7 @@ function completeAbstractValue( info, path, result, + errors, ); } @@ -1142,6 +1320,7 @@ function completeObjectValue( info: GraphQLResolveInfo, path: Path, result: mixed, + errors: Array, ): PromiseOrValue> { // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather @@ -1160,6 +1339,7 @@ function completeObjectValue( fieldNodes, path, result, + errors, ); }); } @@ -1175,6 +1355,7 @@ function completeObjectValue( fieldNodes, path, result, + errors, ); } @@ -1195,10 +1376,43 @@ function collectAndExecuteSubfields( fieldNodes: $ReadOnlyArray, path: Path, result: mixed, + errors: Array, ): PromiseOrValue> { // Collect sub-fields to execute to complete this value. - const subFieldNodes = collectSubfields(exeContext, returnType, fieldNodes); - return executeFields(exeContext, returnType, result, path, subFieldNodes); + const { fields: subFieldNodes, patches: subPatches } = collectSubfields( + exeContext, + returnType, + fieldNodes, + ); + + const subFields = executeFields( + exeContext, + returnType, + result, + path, + subFieldNodes, + errors, + ); + + for (const subPatch of subPatches) { + const { label, fields: subPatchFieldNodes } = subPatch; + const subPatchErrors = []; + exeContext.dispatcher.addFields( + label, + path, + executeFields( + exeContext, + returnType, + result, + path, + subPatchFieldNodes, + subPatchErrors, + ), + subPatchErrors, + ); + } + + return subFields; } /** @@ -1211,21 +1425,28 @@ function _collectSubfields( exeContext: ExecutionContext, returnType: GraphQLObjectType, fieldNodes: $ReadOnlyArray, -): ObjMap> { - let subFieldNodes = Object.create(null); +): FieldsAndPatches { + const subFieldNodes = Object.create(null); const visitedFragmentNames = Object.create(null); + const subPatches = []; + let subFieldsAndPatches = { + fields: subFieldNodes, + patches: subPatches, + }; + for (const node of fieldNodes) { if (node.selectionSet) { - subFieldNodes = collectFields( + subFieldsAndPatches = collectFields( exeContext, returnType, node.selectionSet, subFieldNodes, + subPatches, visitedFragmentNames, ); } } - return subFieldNodes; + return subFieldsAndPatches; } /** @@ -1329,3 +1550,124 @@ export function getFieldDef( } return parentType.getFields()[fieldName]; } + +/** + * Same as ExecutionPatchResult, but without hasNext + */ +type DispatcherResult = {| + errors?: $ReadOnlyArray, + data?: ObjMap | mixed | null, + path: $ReadOnlyArray, + label?: string, + extensions?: ObjMap, +|}; + +/** + * Dispatcher keeps track of subsequent payloads that need to be delivered + * to the client. After initial execution, returns an async iteratable of + * all the AsyncExecutionResults as they are resolved. + */ +export class Dispatcher { + _subsequentPayloads: Array>>; + _initialResult: ?ExecutionResult; + _hasReturnedInitialResult: boolean; + + constructor() { + this._subsequentPayloads = []; + this._hasReturnedInitialResult = false; + } + + hasSubsequentPayloads() { + return this._subsequentPayloads.length !== 0; + } + + addFields( + label?: string, + path?: Path, + promiseOrData: PromiseOrValue | mixed>, + errors: Array, + ): void { + this._subsequentPayloads.push( + Promise.resolve(promiseOrData).then((data) => ({ + value: createPatchResult(data, label, path, errors), + done: false, + })), + ); + } + + _race(): Promise> { + return new Promise((resolve) => { + this._subsequentPayloads.forEach((promise) => { + promise.then(() => { + // resolve with actual promise, not resolved value of promise so we can remove it from this._subsequentPayloads + resolve({ promise }); + }); + }); + }) + .then(({ promise }) => { + this._subsequentPayloads.splice( + this._subsequentPayloads.indexOf(promise), + 1, + ); + return promise; + }) + .then(({ value }) => { + const returnValue: ExecutionPatchResult = { + ...value, + hasNext: this._subsequentPayloads.length > 0, + }; + return { + value: returnValue, + done: false, + }; + }); + } + + _next(): Promise> { + if (!this._hasReturnedInitialResult) { + this._hasReturnedInitialResult = true; + return Promise.resolve({ + value: { + ...this._initialResult, + hasNext: true, + }, + done: false, + }); + } else if (this._subsequentPayloads.length === 0) { + return Promise.resolve({ value: undefined, done: true }); + } + return this._race(); + } + + get(initialResult: ExecutionResult): AsyncIterable { + this._initialResult = initialResult; + return ({ + [SYMBOL_ASYNC_ITERATOR]() { + return this; + }, + next: () => this._next(), + }: any); + } +} + +function createPatchResult( + data: ObjMap | mixed | null, + label?: string, + path?: Path, + errors?: $ReadOnlyArray, +): DispatcherResult { + const value: DispatcherResult = { + data, + path: path ? pathToArray(path) : [], + }; + + if (label != null) { + value.label = label; + } + + if (errors && errors.length > 0) { + value.errors = errors; + } + + return value; +} diff --git a/src/execution/index.d.ts b/src/execution/index.d.ts index d70ba3aaa5..9546be515f 100644 --- a/src/execution/index.d.ts +++ b/src/execution/index.d.ts @@ -8,6 +8,9 @@ export { ExecutionArgs, ExecutionResult, FormattedExecutionResult, + ExecutionPatchResult, + FormattedExecutionPatchResult, + AsyncExecutionResult, } from './execute'; export { getDirectiveValues } from './values'; diff --git a/src/execution/index.js b/src/execution/index.js index 5ae0706ec9..c5114a609c 100644 --- a/src/execution/index.js +++ b/src/execution/index.js @@ -11,6 +11,9 @@ export type { ExecutionArgs, ExecutionResult, FormattedExecutionResult, + ExecutionPatchResult, + FormattedExecutionPatchResult, + AsyncExecutionResult, } from './execute'; export { getDirectiveValues } from './values'; diff --git a/src/graphql.d.ts b/src/graphql.d.ts index 8ba8ef72c8..464260a0cb 100644 --- a/src/graphql.d.ts +++ b/src/graphql.d.ts @@ -3,7 +3,7 @@ import { Maybe } from './jsutils/Maybe'; import { Source } from './language/source'; import { GraphQLSchema } from './type/schema'; import { GraphQLFieldResolver, GraphQLTypeResolver } from './type/definition'; -import { ExecutionResult } from './execution/execute'; +import { ExecutionResult, AsyncExecutionResult } from './execution/execute'; /** * This is the primary entry point function for fulfilling GraphQL operations @@ -51,7 +51,9 @@ export interface GraphQLArgs { typeResolver?: Maybe>; } -export function graphql(args: GraphQLArgs): Promise; +export function graphql( + args: GraphQLArgs, +): Promise>; export function graphql( schema: GraphQLSchema, source: Source | string, @@ -61,7 +63,7 @@ export function graphql( operationName?: Maybe, fieldResolver?: Maybe>, typeResolver?: Maybe>, -): Promise; +): Promise>; /** * The graphqlSync function also fulfills GraphQL operations by parsing, diff --git a/src/graphql.js b/src/graphql.js index da9428086d..a64a88409f 100644 --- a/src/graphql.js +++ b/src/graphql.js @@ -13,7 +13,10 @@ import type { import type { GraphQLSchema } from './type/schema'; import { validateSchema } from './type/validate'; -import type { ExecutionResult } from './execution/execute'; +import type { + ExecutionResult, + AsyncExecutionResult, +} from './execution/execute'; import { execute } from './execution/execute'; /** @@ -65,7 +68,10 @@ export type GraphQLArgs = {| fieldResolver?: ?GraphQLFieldResolver, typeResolver?: ?GraphQLTypeResolver, |}; -declare function graphql(GraphQLArgs, ..._: []): Promise; +declare function graphql( + GraphQLArgs, + ..._: [] +): PromiseOrValue>; /* eslint-disable no-redeclare */ declare function graphql( schema: GraphQLSchema, @@ -76,7 +82,7 @@ declare function graphql( operationName?: ?string, fieldResolver?: ?GraphQLFieldResolver, typeResolver?: ?GraphQLTypeResolver, -): Promise; +): PromiseOrValue>; export function graphql( argsOrSchema, source, @@ -160,7 +166,9 @@ export function graphqlSync( return result; } -function graphqlImpl(args: GraphQLArgs): PromiseOrValue { +function graphqlImpl( + args: GraphQLArgs, +): PromiseOrValue> { const { schema, source, diff --git a/src/index.d.ts b/src/index.d.ts index 0776078b8b..3fef221735 100644 --- a/src/index.d.ts +++ b/src/index.d.ts @@ -300,6 +300,9 @@ export { ExecutionArgs, ExecutionResult, FormattedExecutionResult, + ExecutionPatchResult, + FormattedExecutionPatchResult, + AsyncExecutionResult, } from './execution/index'; export { diff --git a/src/index.js b/src/index.js index 9319e99f73..38120184e8 100644 --- a/src/index.js +++ b/src/index.js @@ -291,6 +291,9 @@ export type { ExecutionArgs, ExecutionResult, FormattedExecutionResult, + ExecutionPatchResult, + FormattedExecutionPatchResult, + AsyncExecutionResult, } from './execution/index'; export { subscribe, createSourceEventStream } from './subscription/index'; diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index 3a20d23ab1..ce931ef802 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -140,8 +140,8 @@ function subscribeImpl( // the GraphQL specification. The `execute` function provides the // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the // "ExecuteQuery" algorithm, for which `execute` is also used. - const mapSourceToResponse = (payload) => - execute({ + const mapSourceToResponse = (payload) => { + const executionResult = execute({ schema, document, rootValue: payload, @@ -150,6 +150,14 @@ function subscribeImpl( operationName, fieldResolver, }); + /* istanbul ignore if - TODO: implement support for defer/stream in subscriptions */ + if (isAsyncIterable(executionResult)) { + throw new Error( + 'TODO: implement support for defer/stream in subscriptions', + ); + } + return executionResult; + }; // Resolve the Source Stream, then map every source value to a // ExecutionResult value as described above. @@ -233,11 +241,12 @@ function executeSubscription( ): Promise> { const { schema, operation, variableValues, rootValue } = exeContext; const type = getOperationRootType(schema, operation); - const fields = collectFields( + const { fields } = collectFields( exeContext, type, operation.selectionSet, Object.create(null), + [], Object.create(null), ); const responseNames = Object.keys(fields); From ba5221365e33b3e71584b06d30961433185c48dc Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Wed, 14 Oct 2020 17:53:14 -0400 Subject: [PATCH 4/6] Add @stream directive to specified directives --- src/__tests__/starWarsIntrospection-test.js | 1 + src/index.js | 1 + src/type/__tests__/introspection-test.js | 48 +++++++++++++++++++ src/type/__tests__/schema-test.js | 1 + src/type/directives.d.ts | 5 ++ src/type/directives.js | 27 ++++++++++- src/type/index.d.ts | 1 + src/type/index.js | 1 + .../__tests__/buildASTSchema-test.js | 14 ++++-- .../__tests__/buildClientSchema-test.js | 1 - src/utilities/__tests__/extendSchema-test.js | 4 +- .../__tests__/findBreakingChanges-test.js | 2 + src/utilities/__tests__/printSchema-test.js | 26 ++++++++++ .../__tests__/KnownTypeNamesRule-test.js | 6 +-- 14 files changed, 123 insertions(+), 15 deletions(-) diff --git a/src/__tests__/starWarsIntrospection-test.js b/src/__tests__/starWarsIntrospection-test.js index d637787c4a..7afd6207b0 100644 --- a/src/__tests__/starWarsIntrospection-test.js +++ b/src/__tests__/starWarsIntrospection-test.js @@ -37,6 +37,7 @@ describe('Star Wars Introspection Tests', () => { { name: 'Droid' }, { name: 'Query' }, { name: 'Boolean' }, + { name: 'Int' }, { name: '__Schema' }, { name: '__Type' }, { name: '__TypeKind' }, diff --git a/src/index.js b/src/index.js index 38120184e8..55d0071b7a 100644 --- a/src/index.js +++ b/src/index.js @@ -54,6 +54,7 @@ export { GraphQLIncludeDirective, GraphQLSkipDirective, GraphQLDeferDirective, + GraphQLStreamDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // "Enum" of Type Kinds diff --git a/src/type/__tests__/introspection-test.js b/src/type/__tests__/introspection-test.js index 252df62fd1..e05ab32469 100644 --- a/src/type/__tests__/introspection-test.js +++ b/src/type/__tests__/introspection-test.js @@ -76,6 +76,16 @@ describe('Introspection', () => { enumValues: null, possibleTypes: null, }, + { + kind: 'SCALAR', + name: 'Int', + specifiedByUrl: null, + fields: null, + inputFields: null, + interfaces: null, + enumValues: null, + possibleTypes: null, + }, { kind: 'OBJECT', name: '__Schema', @@ -961,6 +971,44 @@ describe('Introspection', () => { }, ], }, + { + name: 'stream', + isRepeatable: false, + locations: ['FIELD'], + args: [ + { + defaultValue: null, + name: 'if', + type: { + kind: 'SCALAR', + name: 'Boolean', + ofType: null, + }, + }, + { + defaultValue: null, + name: 'label', + type: { + kind: 'SCALAR', + name: 'String', + ofType: null, + }, + }, + { + defaultValue: null, + name: 'initialCount', + type: { + kind: 'NON_NULL', + name: null, + ofType: { + kind: 'SCALAR', + name: 'Int', + ofType: null, + }, + }, + }, + ], + }, { name: 'deprecated', isRepeatable: false, diff --git a/src/type/__tests__/schema-test.js b/src/type/__tests__/schema-test.js index 1d8817e3c6..e9fa8d6f16 100644 --- a/src/type/__tests__/schema-test.js +++ b/src/type/__tests__/schema-test.js @@ -295,6 +295,7 @@ describe('Type System: Schema', () => { 'ASub', 'Boolean', 'String', + 'Int', '__Schema', '__Type', '__TypeKind', diff --git a/src/type/directives.d.ts b/src/type/directives.d.ts index 6b987df49b..618d25b53e 100644 --- a/src/type/directives.d.ts +++ b/src/type/directives.d.ts @@ -78,6 +78,11 @@ export const GraphQLSkipDirective: GraphQLDirective; */ export const GraphQLDeferDirective: GraphQLDirective; +/** + * Used to conditionally stream list fields. + */ +export const GraphQLStreamDirective: GraphQLDirective; + /** * Used to provide a URL for specifying the behavior of custom scalar definitions. */ diff --git a/src/type/directives.js b/src/type/directives.js index a500a7a88a..fcafde86d1 100644 --- a/src/type/directives.js +++ b/src/type/directives.js @@ -17,7 +17,7 @@ import type { GraphQLArgument, GraphQLFieldConfigArgumentMap, } from './definition'; -import { GraphQLString, GraphQLBoolean } from './scalars'; +import { GraphQLString, GraphQLBoolean, GraphQLInt } from './scalars'; import { argsToArgsConfig, GraphQLNonNull } from './definition'; /** @@ -191,6 +191,30 @@ export const GraphQLDeferDirective = new GraphQLDirective({ }, }); +/** + * Used to conditionally stream list fields. + */ +export const GraphQLStreamDirective = new GraphQLDirective({ + name: 'stream', + description: + 'Directs the executor to stream plural fields when the `if` argument is true or undefined.', + locations: [DirectiveLocation.FIELD], + args: { + if: { + type: GraphQLBoolean, + description: 'Stream when true or undefined.', + }, + label: { + type: GraphQLString, + description: 'Unique name', + }, + initialCount: { + type: new GraphQLNonNull(GraphQLInt), + description: 'Number of items to return immediately', + }, + }, +}); + /** * Constant string used for default reason for a deprecation. */ @@ -240,6 +264,7 @@ export const specifiedDirectives = Object.freeze([ GraphQLIncludeDirective, GraphQLSkipDirective, GraphQLDeferDirective, + GraphQLStreamDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, ]); diff --git a/src/type/index.d.ts b/src/type/index.d.ts index d57144a310..1e995094e1 100644 --- a/src/type/index.d.ts +++ b/src/type/index.d.ts @@ -126,6 +126,7 @@ export { GraphQLIncludeDirective, GraphQLSkipDirective, GraphQLDeferDirective, + GraphQLStreamDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // Constant Deprecation Reason diff --git a/src/type/index.js b/src/type/index.js index 9cea8a7580..85895f1772 100644 --- a/src/type/index.js +++ b/src/type/index.js @@ -77,6 +77,7 @@ export { GraphQLIncludeDirective, GraphQLSkipDirective, GraphQLDeferDirective, + GraphQLStreamDirective, GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, // Constant Deprecation Reason diff --git a/src/utilities/__tests__/buildASTSchema-test.js b/src/utilities/__tests__/buildASTSchema-test.js index c70c39d676..18df872a19 100644 --- a/src/utilities/__tests__/buildASTSchema-test.js +++ b/src/utilities/__tests__/buildASTSchema-test.js @@ -21,6 +21,7 @@ import { GraphQLDeprecatedDirective, GraphQLSpecifiedByDirective, GraphQLDeferDirective, + GraphQLStreamDirective, } from '../../type/directives'; import { GraphQLID, @@ -162,8 +163,7 @@ describe('Schema Builder', () => { it('include standard type only if it is used', () => { const schema = buildSchema('type Query'); - // String and Boolean are always included through introspection types - expect(schema.getType('Int')).to.equal(undefined); + // String, Boolean, and Int are always included through introspection types expect(schema.getType('Float')).to.equal(undefined); expect(schema.getType('ID')).to.equal(undefined); }); @@ -255,10 +255,11 @@ describe('Schema Builder', () => { it('Maintains specified directives', () => { const schema = buildSchema('type Query'); - expect(schema.getDirectives()).to.have.lengthOf(5); + expect(schema.getDirectives()).to.have.lengthOf(6); expect(schema.getDirective('skip')).to.equal(GraphQLSkipDirective); expect(schema.getDirective('include')).to.equal(GraphQLIncludeDirective); expect(schema.getDirective('defer')).to.equal(GraphQLDeferDirective); + expect(schema.getDirective('stream')).to.equal(GraphQLStreamDirective); expect(schema.getDirective('deprecated')).to.equal( GraphQLDeprecatedDirective, ); @@ -274,9 +275,10 @@ describe('Schema Builder', () => { directive @deprecated on FIELD_DEFINITION directive @specifiedBy on FIELD_DEFINITION directive @defer on FRAGMENT_SPREAD + directive @stream on FIELD `); - expect(schema.getDirectives()).to.have.lengthOf(5); + expect(schema.getDirectives()).to.have.lengthOf(6); expect(schema.getDirective('skip')).to.not.equal(GraphQLSkipDirective); expect(schema.getDirective('include')).to.not.equal( GraphQLIncludeDirective, @@ -288,6 +290,7 @@ describe('Schema Builder', () => { GraphQLSpecifiedByDirective, ); expect(schema.getDirective('defer')).to.not.equal(GraphQLDeferDirective); + expect(schema.getDirective('stream')).to.not.equal(GraphQLStreamDirective); }); it('Adding directives maintains specified directives', () => { @@ -295,10 +298,11 @@ describe('Schema Builder', () => { directive @foo(arg: Int) on FIELD `); - expect(schema.getDirectives()).to.have.lengthOf(6); + expect(schema.getDirectives()).to.have.lengthOf(7); expect(schema.getDirective('skip')).to.not.equal(undefined); expect(schema.getDirective('include')).to.not.equal(undefined); expect(schema.getDirective('defer')).to.not.equal(undefined); + expect(schema.getDirective('stream')).to.not.equal(undefined); expect(schema.getDirective('deprecated')).to.not.equal(undefined); expect(schema.getDirective('specifiedBy')).to.not.equal(undefined); }); diff --git a/src/utilities/__tests__/buildClientSchema-test.js b/src/utilities/__tests__/buildClientSchema-test.js index 9a0e83de26..7c2df69fe6 100644 --- a/src/utilities/__tests__/buildClientSchema-test.js +++ b/src/utilities/__tests__/buildClientSchema-test.js @@ -155,7 +155,6 @@ describe('Type System: build schema from introspection', () => { const introspection = introspectionFromSchema(schema); const clientSchema = buildClientSchema(introspection); - expect(clientSchema.getType('Int')).to.equal(undefined); expect(clientSchema.getType('Float')).to.equal(undefined); expect(clientSchema.getType('ID')).to.equal(undefined); }); diff --git a/src/utilities/__tests__/extendSchema-test.js b/src/utilities/__tests__/extendSchema-test.js index e898363a7c..3304ba409f 100644 --- a/src/utilities/__tests__/extendSchema-test.js +++ b/src/utilities/__tests__/extendSchema-test.js @@ -199,8 +199,7 @@ describe('extendSchema', () => { it('extends objects with standard type fields', () => { const schema = buildSchema('type Query'); - // String and Boolean are always included through introspection types - expect(schema.getType('Int')).to.equal(undefined); + // String, Boolean, and Int are always included through introspection types expect(schema.getType('Float')).to.equal(undefined); expect(schema.getType('String')).to.equal(GraphQLString); expect(schema.getType('Boolean')).to.equal(GraphQLBoolean); @@ -214,7 +213,6 @@ describe('extendSchema', () => { const extendedSchema = extendSchema(schema, extendAST); expect(validateSchema(extendedSchema)).to.deep.equal([]); - expect(extendedSchema.getType('Int')).to.equal(undefined); expect(extendedSchema.getType('Float')).to.equal(undefined); expect(extendedSchema.getType('String')).to.equal(GraphQLString); expect(extendedSchema.getType('Boolean')).to.equal(GraphQLBoolean); diff --git a/src/utilities/__tests__/findBreakingChanges-test.js b/src/utilities/__tests__/findBreakingChanges-test.js index 461d5d0c8c..754b42e0c7 100644 --- a/src/utilities/__tests__/findBreakingChanges-test.js +++ b/src/utilities/__tests__/findBreakingChanges-test.js @@ -5,6 +5,7 @@ import { GraphQLSchema } from '../../type/schema'; import { GraphQLSkipDirective, GraphQLDeferDirective, + GraphQLStreamDirective, GraphQLIncludeDirective, GraphQLSpecifiedByDirective, GraphQLDeprecatedDirective, @@ -804,6 +805,7 @@ describe('findBreakingChanges', () => { GraphQLIncludeDirective, GraphQLSpecifiedByDirective, GraphQLDeferDirective, + GraphQLStreamDirective, ], }); diff --git a/src/utilities/__tests__/printSchema-test.js b/src/utilities/__tests__/printSchema-test.js index 2c1f1c759c..3db8a3f7a9 100644 --- a/src/utilities/__tests__/printSchema-test.js +++ b/src/utilities/__tests__/printSchema-test.js @@ -638,6 +638,20 @@ describe('Type System Printer', () => { label: String ) on FRAGMENT_SPREAD | INLINE_FRAGMENT + """ + Directs the executor to stream plural fields when the \`if\` argument is true or undefined. + """ + directive @stream( + """Stream when true or undefined.""" + if: Boolean + + """Unique name""" + label: String + + """Number of items to return immediately""" + initialCount: Int! + ) on FIELD + """Marks an element of a GraphQL schema as no longer supported.""" directive @deprecated( """ @@ -872,6 +886,18 @@ describe('Type System Printer', () => { label: String ) on FRAGMENT_SPREAD | INLINE_FRAGMENT + # Directs the executor to stream plural fields when the \`if\` argument is true or undefined. + directive @stream( + # Stream when true or undefined. + if: Boolean + + # Unique name + label: String + + # Number of items to return immediately + initialCount: Int! + ) on FIELD + # Marks an element of a GraphQL schema as no longer supported. directive @deprecated( # Explains why this element was deprecated, usually also including a suggestion for how to access supported similar data. Formatted using the Markdown syntax, as specified by [CommonMark](https://commonmark.org/). diff --git a/src/validation/__tests__/KnownTypeNamesRule-test.js b/src/validation/__tests__/KnownTypeNamesRule-test.js index f56ef4ceab..f0533113b3 100644 --- a/src/validation/__tests__/KnownTypeNamesRule-test.js +++ b/src/validation/__tests__/KnownTypeNamesRule-test.js @@ -81,7 +81,7 @@ describe('Validate: Known type names', () => { it('references to standard scalars that are missing in schema', () => { const schema = buildSchema('type Query { foo: String }'); const query = ` - query ($id: ID, $float: Float, $int: Int) { + query ($id: ID, $float: Float) { __typename } `; @@ -94,10 +94,6 @@ describe('Validate: Known type names', () => { message: 'Unknown type "Float".', locations: [{ line: 2, column: 31 }], }, - { - message: 'Unknown type "Int".', - locations: [{ line: 2, column: 44 }], - }, ]); }); From a758ca746edfd06ba3155c56f0a8924f69b79c74 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Wed, 14 Oct 2020 19:09:04 -0400 Subject: [PATCH 5/6] Implement support for @stream directive --- src/execution/__tests__/stream-test.js | 629 ++++++++++++++++++ src/execution/execute.js | 218 +++++- .../OverlappingFieldsCanBeMergedRule-test.js | 117 ++++ .../rules/OverlappingFieldsCanBeMergedRule.js | 60 ++ 4 files changed, 1021 insertions(+), 3 deletions(-) create mode 100644 src/execution/__tests__/stream-test.js diff --git a/src/execution/__tests__/stream-test.js b/src/execution/__tests__/stream-test.js new file mode 100644 index 0000000000..8be71a0333 --- /dev/null +++ b/src/execution/__tests__/stream-test.js @@ -0,0 +1,629 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import isAsyncIterable from '../../jsutils/isAsyncIterable'; +import { parse } from '../../language/parser'; + +import { GraphQLID, GraphQLString } from '../../type/scalars'; +import { GraphQLSchema } from '../../type/schema'; +import { GraphQLObjectType, GraphQLList } from '../../type/definition'; + +import { execute } from '../execute'; + +const friendType = new GraphQLObjectType({ + fields: { + id: { type: GraphQLID }, + name: { type: GraphQLString }, + asyncName: { + type: GraphQLString, + resolve(rootValue) { + return Promise.resolve(rootValue.name); + }, + }, + }, + name: 'Friend', +}); + +const friends = [ + { name: 'Luke', id: 1 }, + { name: 'Han', id: 2 }, + { name: 'Leia', id: 3 }, +]; + +const query = new GraphQLObjectType({ + fields: { + scalarList: { + type: new GraphQLList(GraphQLString), + resolve: () => ['apple', 'banana', 'coconut'], + }, + asyncList: { + type: new GraphQLList(friendType), + resolve: () => friends.map((f) => Promise.resolve(f)), + }, + asyncListError: { + type: new GraphQLList(friendType), + resolve: () => + friends.map((f, i) => { + if (i === 1) { + return Promise.reject(new Error('bad')); + } + return Promise.resolve(f); + }), + }, + asyncIterableList: { + type: new GraphQLList(friendType), + async *resolve() { + for (const friend of friends) { + yield friend; + } + }, + }, + asyncIterableError: { + type: new GraphQLList(friendType), + async *resolve() { + yield friends[0]; + throw new Error('bad'); + }, + }, + asyncIterableInvalid: { + type: new GraphQLList(GraphQLString), + async *resolve() { + yield friends[0].name; + yield {}; + }, + }, + asyncIterableListDelayedClose: { + type: new GraphQLList(friendType), + async *resolve() { + for (const friend of friends) { + yield friend; + } + await new Promise((r) => setTimeout(r, 1)); + }, + }, + }, + name: 'Query', +}); + +async function complete(document) { + const schema = new GraphQLSchema({ query }); + + const result = await execute(schema, document, {}); + + if (isAsyncIterable(result)) { + const results = []; + for await (const patch of result) { + results.push(patch); + } + return results; + } + return result; +} + +describe('Execute: stream directive', () => { + it('Can stream a list field', async () => { + const document = parse('{ scalarList @stream(initialCount: 0) }'); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { + scalarList: [], + }, + hasNext: true, + }, + { + data: 'apple', + path: ['scalarList', 0], + hasNext: true, + }, + { + data: 'banana', + path: ['scalarList', 1], + hasNext: true, + }, + { + data: 'coconut', + path: ['scalarList', 2], + hasNext: false, + }, + ]); + }); + it('Returns label from stream directive', async () => { + const document = parse( + '{ scalarList @stream(initialCount: 1, label: "scalar-stream") }', + ); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { + scalarList: ['apple'], + }, + hasNext: true, + }, + { + data: 'banana', + path: ['scalarList', 1], + label: 'scalar-stream', + hasNext: true, + }, + { + data: 'coconut', + path: ['scalarList', 2], + label: 'scalar-stream', + hasNext: false, + }, + ]); + }); + it('Can disable @stream using if argument', async () => { + const document = parse( + '{ scalarList @stream(initialCount: 0, if: false) }', + ); + const result = await complete(document); + + expect(result).to.deep.equal({ + data: { scalarList: ['apple', 'banana', 'coconut'] }, + }); + }); + it('Can stream a field that returns a list of promises', async () => { + const document = parse(` + query { + asyncList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncList: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncList', 2], + hasNext: false, + }, + ]); + }); + it('Handles rejections in a field that returns a list of promises before initialCount is reached', async () => { + const document = parse(` + query { + asyncListError @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncListError', 1], + }, + ], + data: { + asyncListError: [ + { + name: 'Luke', + id: '1', + }, + null, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncListError', 2], + hasNext: false, + }, + ]); + }); + it('Handles rejections in a field that returns a list of promises after initialCount is reached', async () => { + const document = parse(` + query { + asyncListError @stream(initialCount: 1) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncListError: [ + { + name: 'Luke', + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: null, + path: ['asyncListError', 1], + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncListError', 1], + }, + ], + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncListError', 2], + hasNext: false, + }, + ]); + }); + it('Can stream a field that returns an async iterable', async () => { + const document = parse(` + query { + asyncIterableList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableList: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncIterableList', 2], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Handles error thrown in async iterable before initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableError @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal({ + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncIterableError', 1], + }, + ], + data: { + asyncIterableError: [ + { + name: 'Luke', + id: '1', + }, + null, + ], + }, + }); + }); + it('Handles error thrown in async iterable after initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableError @stream(initialCount: 1) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableError: [ + { + name: 'Luke', + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: null, + path: ['asyncIterableError', 1], + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncIterableError', 1], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles errors thrown by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableInvalid @stream(initialCount: 1) + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableInvalid: ['Luke'], + }, + hasNext: true, + }, + { + data: null, + path: ['asyncIterableInvalid', 1], + errors: [ + { + message: 'String cannot represent value: {}', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncIterableInvalid', 1], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + + it('Handles promises returned by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableList @stream(initialCount: 1) { + name + asyncName + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableList: [ + { + name: 'Luke', + asyncName: 'Luke', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Han', + asyncName: 'Han', + }, + path: ['asyncIterableList', 1], + hasNext: true, + }, + { + data: { + name: 'Leia', + asyncName: 'Leia', + }, + path: ['asyncIterableList', 2], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + + it('Can @defer fields that are resolved after async iterable is complete', async () => { + const document = parse(` + query { + asyncIterableList @stream(initialCount: 1, label:"stream-label") { + ...NameFragment @defer(label: "DeferName") @defer(label: "DeferName") + id + } + } + fragment NameFragment on Friend { + name + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableList: [ + { + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Luke', + }, + path: ['asyncIterableList', 0], + label: 'DeferName', + hasNext: true, + }, + { + data: { + id: '2', + }, + path: ['asyncIterableList', 1], + label: 'stream-label', + hasNext: true, + }, + { + data: { + id: '3', + }, + path: ['asyncIterableList', 2], + label: 'stream-label', + hasNext: true, + }, + { + data: { + name: 'Han', + }, + path: ['asyncIterableList', 1], + label: 'DeferName', + hasNext: true, + }, + { + data: { + name: 'Leia', + }, + path: ['asyncIterableList', 2], + label: 'DeferName', + hasNext: false, + }, + ]); + }); + it('Can @defer fields that are resolved before async iterable is complete', async () => { + const document = parse(` + query { + asyncIterableListDelayedClose @stream(initialCount: 1, label:"stream-label") { + ...NameFragment @defer(label: "DeferName") @defer(label: "DeferName") + id + } + } + fragment NameFragment on Friend { + name + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableListDelayedClose: [ + { + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Luke', + }, + path: ['asyncIterableListDelayedClose', 0], + label: 'DeferName', + hasNext: true, + }, + { + data: { + id: '2', + }, + path: ['asyncIterableListDelayedClose', 1], + label: 'stream-label', + hasNext: true, + }, + { + data: { + id: '3', + }, + path: ['asyncIterableListDelayedClose', 2], + label: 'stream-label', + hasNext: true, + }, + { + data: { + name: 'Han', + }, + path: ['asyncIterableListDelayedClose', 1], + label: 'DeferName', + hasNext: true, + }, + { + data: { + name: 'Leia', + }, + path: ['asyncIterableListDelayedClose', 2], + label: 'DeferName', + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); +}); diff --git a/src/execution/execute.js b/src/execution/execute.js index 4523b1df7c..20ef6c7980 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -53,6 +53,7 @@ import { GraphQLIncludeDirective, GraphQLSkipDirective, GraphQLDeferDirective, + GraphQLStreamDirective, } from '../type/directives'; import { isNamedType, @@ -748,6 +749,42 @@ function getDeferValues( }; } +/** + * Returns an object containing the @stream arguments if a field should be + * streamed based on the experimental flag, stream directive present and + * not disabled by the "if" argument. + */ +function getStreamValues( + exeContext: ExecutionContext, + fieldNodes: $ReadOnlyArray, +): void | {| + initialCount?: number, + label?: string, +|} { + // validation only allows equivalent streams on multiple fields, so it is + // safe to only check the first fieldNode for the stream directive + const stream = getDirectiveValues( + GraphQLStreamDirective, + fieldNodes[0], + exeContext.variableValues, + ); + + if (!stream) { + return; + } + + if (stream.if === false) { + return; + } + + return { + initialCount: + // istanbul ignore next (initialCount is required number argument) + typeof stream.initialCount === 'number' ? stream.initialCount : undefined, + label: typeof stream.label === 'string' ? stream.label : undefined, + }; +} + /** * Determines if a fragment is applicable to the given type. */ @@ -1040,6 +1077,7 @@ function completeAsyncIteratorValue( errors: Array, ): Promise<$ReadOnlyArray> { let containsPromise = false; + const stream = getStreamValues(exeContext, fieldNodes); return new Promise((resolve) => { function next(index, completedResults) { const fieldPath = addPath(path, index, undefined); @@ -1076,7 +1114,26 @@ function completeAsyncIteratorValue( return; } - next(index + 1, completedResults); + const newIndex = index + 1; + if ( + stream && + typeof stream.initialCount === 'number' && + newIndex >= stream.initialCount + ) { + exeContext.dispatcher.addAsyncIteratorValue( + stream.label, + newIndex, + path, + iterator, + exeContext, + fieldNodes, + info, + itemType, + ); + resolve(completedResults); + return; + } + next(newIndex, completedResults); }, (rawError) => { completedResults.push(null); @@ -1131,6 +1188,8 @@ function completeListValue( ); } + const stream = getStreamValues(exeContext, fieldNodes); + // This is specified as a simple map, however we're optimizing the path // where the list contains no Promises by avoiding creating another Promise. let containsPromise = false; @@ -1140,6 +1199,23 @@ function completeListValue( const itemPath = addPath(path, index, undefined); try { let completedItem; + + if ( + stream && + typeof stream.initialCount === 'number' && + index >= stream.initialCount + ) { + exeContext.dispatcher.addValue( + stream.label, + itemPath, + item, + exeContext, + fieldNodes, + info, + itemType, + ); + return; + } if (isPromise(item)) { completedItem = item.then((resolved) => completeValue( @@ -1182,7 +1258,7 @@ function completeListValue( const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); return handleFieldError(error, itemType, errors); } - }); + }).filter((val) => val !== undefined); return containsPromise ? Promise.all(completedResults) : completedResults; } @@ -1595,6 +1671,129 @@ export class Dispatcher { ); } + addValue( + label?: string, + path: Path, + promiseOrData: PromiseOrValue | mixed>, + exeContext: ExecutionContext, + fieldNodes: $ReadOnlyArray, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, + ): void { + const errors = []; + this._subsequentPayloads.push( + Promise.resolve(promiseOrData) + .then((resolved) => + completeValue( + exeContext, + itemType, + fieldNodes, + info, + path, + resolved, + errors, + ), + ) + // Note: we don't rely on a `catch` method, but we do expect "thenable" + // to take a second callback for the error case. + .then(undefined, (rawError) => { + const error = locatedError(rawError, fieldNodes, pathToArray(path)); + return handleFieldError(error, itemType, errors); + }) + .then((data) => ({ + value: createPatchResult(data, label, path, errors), + done: false, + })), + ); + } + + addAsyncIteratorValue( + label?: string, + initialIndex: number, + path?: Path, + iterator: AsyncIterator, + exeContext: ExecutionContext, + fieldNodes: $ReadOnlyArray, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, + ): void { + const subsequentPayloads = this._subsequentPayloads; + function next(index) { + const fieldPath = addPath(path, index); + const patchErrors = []; + subsequentPayloads.push( + iterator.next().then( + ({ value: data, done }) => { + if (done) { + return { value: undefined, done: true }; + } + + // eslint-disable-next-line node/callback-return + next(index + 1); + + try { + const completedItem = completeValue( + exeContext, + itemType, + fieldNodes, + info, + fieldPath, + data, + patchErrors, + ); + + if (isPromise(completedItem)) { + return completedItem.then((resolveItem) => ({ + value: createPatchResult( + resolveItem, + label, + fieldPath, + patchErrors, + ), + done: false, + })); + } + + return { + value: createPatchResult( + completedItem, + label, + fieldPath, + patchErrors, + ), + done: false, + }; + } catch (rawError) { + const error = locatedError( + rawError, + fieldNodes, + pathToArray(fieldPath), + ); + handleFieldError(error, itemType, patchErrors); + return { + value: createPatchResult(null, label, fieldPath, patchErrors), + done: false, + }; + } + }, + (rawError) => { + const error = locatedError( + rawError, + fieldNodes, + pathToArray(fieldPath), + ); + handleFieldError(error, itemType, patchErrors); + return { + value: createPatchResult(null, label, fieldPath, patchErrors), + done: false, + }; + }, + ), + ); + } + next(initialIndex); + } + _race(): Promise> { return new Promise((resolve) => { this._subsequentPayloads.forEach((promise) => { @@ -1611,7 +1810,20 @@ export class Dispatcher { ); return promise; }) - .then(({ value }) => { + .then(({ value, done }) => { + if (done && this._subsequentPayloads.length === 0) { + // async iterable resolver just finished and no more pending payloads + return { + value: { + hasNext: false, + }, + done: false, + }; + } else if (done) { + // async iterable resolver just finished but there are pending payloads + // return the next one + return this._race(); + } const returnValue: ExecutionPatchResult = { ...value, hasNext: this._subsequentPayloads.length > 0, diff --git a/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js b/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js index 080f859b89..2a1982180a 100644 --- a/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js +++ b/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js @@ -98,6 +98,123 @@ describe('Validate: Overlapping fields can be merged', () => { `); }); + it('Same stream directives supported', () => { + expectValid(` + fragment differentDirectivesWithDifferentAliases on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "streamLabel", initialCount: 1) + } + `); + }); + + it('different stream directive label', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "anotherLabel", initialCount: 1) + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive initialCount', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "streamLabel", initialCount: 2) + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive first missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name @stream(label: "streamLabel", initialCount: 1) + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive second missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('mix of stream and no stream', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive both missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name @stream + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + it('Same aliases with different field targets', () => { expectErrors(` fragment sameAliasesWithDifferentFieldTargets on Dog { diff --git a/src/validation/rules/OverlappingFieldsCanBeMergedRule.js b/src/validation/rules/OverlappingFieldsCanBeMergedRule.js index 2d79dd098f..542aceee97 100644 --- a/src/validation/rules/OverlappingFieldsCanBeMergedRule.js +++ b/src/validation/rules/OverlappingFieldsCanBeMergedRule.js @@ -13,6 +13,7 @@ import type { FieldNode, ArgumentNode, FragmentDefinitionNode, + DirectiveNode, } from '../../language/ast'; import { Kind } from '../../language/kinds'; import { print } from '../../language/printer'; @@ -584,6 +585,18 @@ function findConflict( [node2], ]; } + + // istanbul ignore next (See: 'https://github.com/graphql/graphql-js/issues/2203') + const directives1 = node1.directives ?? []; + // istanbul ignore next (See: 'https://github.com/graphql/graphql-js/issues/2203') + const directives2 = node2.directives ?? []; + if (!sameStreams(directives1, directives2)) { + return [ + [responseName, 'they have differing stream directives'], + [node1], + [node2], + ]; + } } // The return type for each field. @@ -642,6 +655,53 @@ function sameArguments( }); } +function sameDirectiveArgument( + directive1: DirectiveNode, + directive2: DirectiveNode, + argumentName: string, +): boolean { + /* istanbul ignore next (See https://github.com/graphql/graphql-js/issues/2203) */ + const args1 = directive1.arguments || []; + const arg1 = find(args1, (argument) => argument.name.value === argumentName); + if (!arg1) { + return false; + } + + /* istanbul ignore next (See https://github.com/graphql/graphql-js/issues/2203) */ + const args2 = directive2.arguments || []; + const arg2 = find(args2, (argument) => argument.name.value === argumentName); + if (!arg2) { + return false; + } + return sameValue(arg1.value, arg2.value); +} + +function getStreamDirective( + directives: $ReadOnlyArray, +): ?DirectiveNode { + return find(directives, (directive) => directive.name.value === 'stream'); +} + +function sameStreams( + directives1: $ReadOnlyArray, + directives2: $ReadOnlyArray, +): boolean { + const stream1 = getStreamDirective(directives1); + const stream2 = getStreamDirective(directives2); + if (!stream1 && !stream2) { + // both fields do not have streams + return true; + } else if (stream1 && stream2) { + // check if both fields have equivalent streams + return ( + sameDirectiveArgument(stream1, stream2, 'initialCount') && + sameDirectiveArgument(stream1, stream2, 'label') + ); + } + // fields have a mix of stream and no stream + return false; +} + function sameValue(value1: ValueNode, value2: ValueNode): boolean { return print(value1) === print(value2); } From 54cb43c27850d1e4944029009653892fdbffc752 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Fri, 23 Oct 2020 10:57:45 -0400 Subject: [PATCH 6/6] add defer/stream support for subscriptions (#7) --- .../__tests__/flattenAsyncIterator-test.js | 135 ++++++++++++++++ src/subscription/__tests__/subscribe-test.js | 147 ++++++++++++++++++ src/subscription/flattenAsyncIterator.js | 49 ++++++ src/subscription/subscribe.js | 23 ++- 4 files changed, 340 insertions(+), 14 deletions(-) create mode 100644 src/subscription/__tests__/flattenAsyncIterator-test.js create mode 100644 src/subscription/flattenAsyncIterator.js diff --git a/src/subscription/__tests__/flattenAsyncIterator-test.js b/src/subscription/__tests__/flattenAsyncIterator-test.js new file mode 100644 index 0000000000..db17f6dc3c --- /dev/null +++ b/src/subscription/__tests__/flattenAsyncIterator-test.js @@ -0,0 +1,135 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import flattenAsyncIterator from '../flattenAsyncIterator'; + +describe('flattenAsyncIterator', () => { + it('does not modify an already flat async generator', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const result = flattenAsyncIterator(source()); + + expect(await result.next()).to.deep.equal({ value: 1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2, done: false }); + expect(await result.next()).to.deep.equal({ value: 3, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('does not modify an already flat async iterator', async () => { + const items = [1, 2, 3]; + + const iterator: any = { + [Symbol.asyncIterator]() { + return this; + }, + next() { + return Promise.resolve({ + done: items.length === 0, + value: items.shift(), + }); + }, + }; + + const result = flattenAsyncIterator(iterator); + + expect(await result.next()).to.deep.equal({ value: 1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2, done: false }); + expect(await result.next()).to.deep.equal({ value: 3, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('flatten nested async generators', async () => { + async function* source() { + yield 1; + yield 2; + yield (async function* (): AsyncGenerator { + yield 2.1; + yield 2.2; + })(); + yield 3; + } + + const doubles = flattenAsyncIterator(source()); + + const result = []; + for await (const x of doubles) { + result.push(x); + } + expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]); + }); + + it('allows returning early from a nested async generator', async () => { + async function* source() { + yield 1; + yield 2; + yield (async function* (): AsyncGenerator { + yield 2.1; + // istanbul ignore next (Shouldn't be reached) + yield 2.2; + })(); + // istanbul ignore next (Shouldn't be reached) + yield 3; + } + + const doubles = flattenAsyncIterator(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Early return + expect(await doubles.return()).to.deep.equal({ + value: undefined, + done: true, + }); + + // Subsequent next calls + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('allows throwing errors from a nested async generator', async () => { + async function* source() { + yield 1; + yield 2; + yield (async function* (): AsyncGenerator { + yield 2.1; + // istanbul ignore next (Shouldn't be reached) + yield 2.2; + })(); + // istanbul ignore next (Shouldn't be reached) + yield 3; + } + + const doubles = flattenAsyncIterator(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Throw error + let caughtError; + try { + await doubles.throw('ouch'); + } catch (e) { + caughtError = e; + } + expect(caughtError).to.equal('ouch'); + }); +}); diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 5df245c3e7..b7414ec1d2 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -668,6 +668,153 @@ describe('Subscription Publish Phase', () => { }); }); + it('produces additional payloads for subscriptions with @defer', async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription( + pubsub, + emailSchema, + parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject + } + ... @defer { + inbox { + unread + total + } + } + } + } + `), + ); + invariant(isAsyncIterable(subscription)); + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + // The previously waited on payload now has a value. + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + }, + }, + hasNext: true, + }, + }); + + // Wait for the next payload from @defer + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + inbox: { + unread: 1, + total: 2, + }, + }, + path: ['importantEmail'], + hasNext: false, + }, + }); + + // Another new email arrives, after all incrementally delivered payloads are received. + expect( + pubsub.emit({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + }), + ).to.equal(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'hyo@graphql.org', + subject: 'Tools', + }, + }, + }, + hasNext: true, + }, + }); + + // Another new email arrives, before the incrementally delivered payloads from the last email was received. + expect( + pubsub.emit({ + from: 'adam@graphql.org', + subject: 'Important', + message: 'Read me please', + unread: true, + }), + ).to.equal(true); + + // Deferred payload from previous event is received. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + inbox: { + unread: 2, + total: 3, + }, + }, + path: ['importantEmail'], + hasNext: false, + }, + }); + + // Next payload from last event + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'adam@graphql.org', + subject: 'Important', + }, + }, + }, + hasNext: true, + }, + }); + + // The client disconnects before the deferred payload is consumed. + expect(await subscription.return()).to.deep.equal({ + done: true, + value: undefined, + }); + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + it('produces a payload when there are multiple events', async () => { const pubsub = new SimplePubSub(); const subscription = await createSubscription(pubsub); diff --git a/src/subscription/flattenAsyncIterator.js b/src/subscription/flattenAsyncIterator.js new file mode 100644 index 0000000000..cb38efb739 --- /dev/null +++ b/src/subscription/flattenAsyncIterator.js @@ -0,0 +1,49 @@ +import { SYMBOL_ASYNC_ITERATOR } from '../polyfills/symbols'; + +import isAsyncIterable from '../jsutils/isAsyncIterable'; + +/** + * Given an AsyncIterable that could potentially yield other async iterators, + * flatten all yielded results into a single AsyncIterable + */ +export default function flattenAsyncIterator( + iterable: AsyncGenerator | T, void, void>, +): AsyncGenerator { + // $FlowFixMe[prop-missing] + const iteratorMethod = iterable[SYMBOL_ASYNC_ITERATOR]; + const iterator: any = iteratorMethod.call(iterable); + let iteratorStack: Array> = [iterator]; + + function next(): Promise> { + const currentIterator = iteratorStack[0]; + if (!currentIterator) { + return Promise.resolve({ value: undefined, done: true }); + } + return currentIterator.next().then((result) => { + if (result.done) { + iteratorStack.shift(); + return next(); + } else if (isAsyncIterable(result.value)) { + const childIteratorMethod = result.value[SYMBOL_ASYNC_ITERATOR]; + const childIterator: any = childIteratorMethod.call(result.value); + iteratorStack.unshift(childIterator); + return next(); + } + return result; + }); + } + return ({ + next, + return() { + iteratorStack = []; + return iterator.return(); + }, + throw(error?: mixed): Promise> { + iteratorStack = []; + return iterator.throw(error); + }, + [SYMBOL_ASYNC_ITERATOR]() { + return this; + }, + }: $FlowFixMe); +} diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index ce931ef802..8386413e3d 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -24,6 +24,7 @@ import type { GraphQLFieldResolver } from '../type/definition'; import { getOperationRootType } from '../utilities/getOperationRootType'; import mapAsyncIterator from './mapAsyncIterator'; +import flattenAsyncIterator from './flattenAsyncIterator'; export type SubscriptionArgs = {| schema: GraphQLSchema, @@ -140,8 +141,8 @@ function subscribeImpl( // the GraphQL specification. The `execute` function provides the // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the // "ExecuteQuery" algorithm, for which `execute` is also used. - const mapSourceToResponse = (payload) => { - const executionResult = execute({ + const mapSourceToResponse = (payload) => + execute({ schema, document, rootValue: payload, @@ -150,24 +151,18 @@ function subscribeImpl( operationName, fieldResolver, }); - /* istanbul ignore if - TODO: implement support for defer/stream in subscriptions */ - if (isAsyncIterable(executionResult)) { - throw new Error( - 'TODO: implement support for defer/stream in subscriptions', - ); - } - return executionResult; - }; // Resolve the Source Stream, then map every source value to a // ExecutionResult value as described above. return sourcePromise.then((resultOrStream) => // Note: Flow can't refine isAsyncIterable, so explicit casts are used. isAsyncIterable(resultOrStream) - ? mapAsyncIterator( - resultOrStream, - mapSourceToResponse, - reportGraphQLError, + ? flattenAsyncIterator( + mapAsyncIterator( + resultOrStream, + mapSourceToResponse, + reportGraphQLError, + ), ) : ((resultOrStream: any): ExecutionResult), );