From 768f69e540d772df1c0dff632db90459ec24c5c0 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Wed, 20 Apr 2022 16:35:21 -0400 Subject: [PATCH] Return underlying AsyncIterators when execute result is returned (#2843) # Conflicts: # src/execution/execute.ts --- src/execution/__tests__/stream-test.ts | 203 +++++++++++++++++++++++++ src/execution/execute.ts | 28 +++- 2 files changed, 225 insertions(+), 6 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 35c12af9d8d..a90ed2fd258 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -2,6 +2,7 @@ import { describe, it } from 'mocha'; import { expectJSON } from '../../__testUtils__/expectJSON'; +import { invariant } from '../../jsutils/invariant'; import { isAsyncIterable } from '../../jsutils/isAsyncIterable'; import type { DocumentNode } from '../../language/ast'; @@ -134,6 +135,37 @@ const query = new GraphQLObjectType({ yield await Promise.resolve({ string: friends[1].name }); }, }, + asyncIterableListDelayed: { + type: new GraphQLList(friendType), + async *resolve() { + for (const friend of friends) { + // pause an additional ms before yielding to allow time + // for tests to return or throw before next value is processed. + // eslint-disable-next-line no-await-in-loop + await new Promise((r) => setTimeout(r, 1)); + yield friend; /* c8 ignore start */ + // Not reachable, early return + } + } /* c8 ignore stop */, + }, + asyncIterableListNoReturn: { + type: new GraphQLList(friendType), + resolve() { + let i = 0; + return { + [Symbol.asyncIterator]: () => ({ + async next() { + const friend = friends[i++]; + if (friend) { + await new Promise((r) => setTimeout(r, 1)); + return { value: friend, done: false }; + } + return { value: undefined, done: true }; + }, + }), + }; + }, + }, asyncIterableListDelayedClose: { type: new GraphQLList(friendType), async *resolve() { @@ -1080,4 +1112,175 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Returns underlying async iterables when dispatcher is returned', async () => { + const document = parse(` + query { + asyncIterableListDelayed @stream(initialCount: 1) { + name + id + } + } + `); + const schema = new GraphQLSchema({ query }); + + const executeResult = await execute({ schema, document, rootValue: {} }); + invariant(isAsyncIterable(executeResult)); + const iterator = executeResult[Symbol.asyncIterator](); + + const result1 = await iterator.next(); + expectJSON(result1).toDeepEqual({ + done: false, + value: { + data: { + asyncIterableListDelayed: [ + { + id: '1', + name: 'Luke', + }, + ], + }, + hasNext: true, + }, + }); + + const returnPromise = iterator.return(); + + // this result had started processing before return was called + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + done: false, + value: { + data: { + id: '2', + name: 'Han', + }, + hasNext: true, + path: ['asyncIterableListDelayed', 1], + }, + }); + + // third result is not returned because async iterator has returned + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + done: true, + value: undefined, + }); + await returnPromise; + }); + it('Can return async iterable when underlying iterable does not have a return method', async () => { + const document = parse(` + query { + asyncIterableListNoReturn @stream(initialCount: 1) { + name + id + } + } + `); + const schema = new GraphQLSchema({ query }); + + const executeResult = await execute({ schema, document, rootValue: {} }); + invariant(isAsyncIterable(executeResult)); + const iterator = executeResult[Symbol.asyncIterator](); + + const result1 = await iterator.next(); + expectJSON(result1).toDeepEqual({ + done: false, + value: { + data: { + asyncIterableListNoReturn: [ + { + id: '1', + name: 'Luke', + }, + ], + }, + hasNext: true, + }, + }); + + const returnPromise = iterator.return(); + + // this result had started processing before return was called + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + done: false, + value: { + data: { + id: '2', + name: 'Han', + }, + hasNext: true, + path: ['asyncIterableListNoReturn', 1], + }, + }); + + // third result is not returned because async iterator has returned + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + done: true, + value: undefined, + }); + await returnPromise; + }); + it('Returns underlying async iterables when dispatcher is thrown', async () => { + const document = parse(` + query { + asyncIterableListDelayed @stream(initialCount: 1) { + name + id + } + } + `); + const schema = new GraphQLSchema({ query }); + + const executeResult = await execute({ schema, document, rootValue: {} }); + invariant(isAsyncIterable(executeResult)); + const iterator = executeResult[Symbol.asyncIterator](); + + const result1 = await iterator.next(); + expectJSON(result1).toDeepEqual({ + done: false, + value: { + data: { + asyncIterableListDelayed: [ + { + id: '1', + name: 'Luke', + }, + ], + }, + hasNext: true, + }, + }); + + const throwPromise = iterator.throw(new Error('bad')); + + // this result had started processing before return was called + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + done: false, + value: { + data: { + id: '2', + name: 'Han', + }, + hasNext: true, + path: ['asyncIterableListDelayed', 1], + }, + }); + + // third result is not returned because async iterator has returned + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + done: true, + value: undefined, + }); + try { + await throwPromise; /* c8 ignore start */ + // Not reachable, always throws + /* c8 ignore stop */ + } catch (e) { + // ignore error + } + }); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index b7f82e1db6a..d182ae43618 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1554,6 +1554,7 @@ async function executeStreamIterator( label, path: fieldPath, parentContext, + iterator, }); const dataPromise = executeStreamIteratorItem( iterator, @@ -1581,6 +1582,7 @@ function yieldSubsequentPayloads( initialResult: ExecutionResult, ): AsyncGenerator { let _hasReturnedInitialResult = false; + let isDone = false; async function race(): Promise> { if (exeContext.subsequentPayloads.length === 0) { @@ -1651,17 +1653,31 @@ function yieldSubsequentPayloads( }, done: false, }); - } else if (exeContext.subsequentPayloads.length === 0) { + } else if (exeContext.subsequentPayloads.length === 0 || isDone) { return Promise.resolve({ value: undefined, done: true }); } return race(); }, - // TODO: implement return & throw - return: /* istanbul ignore next: will be covered in follow up */ () => - Promise.resolve({ value: undefined, done: true }), - throw: /* istanbul ignore next: will be covered in follow up */ ( + async return(): Promise> { + await Promise.all( + exeContext.subsequentPayloads.map((asyncPayloadRecord) => + asyncPayloadRecord.iterator?.return?.(), + ), + ); + isDone = true; + return { value: undefined, done: true }; + }, + async throw( error?: unknown, - ) => Promise.reject(error), + ): Promise> { + await Promise.all( + exeContext.subsequentPayloads.map((asyncPayloadRecord) => + asyncPayloadRecord.iterator?.return?.(), + ), + ); + isDone = true; + return Promise.reject(error); + }, }; }