Skip to content

Commit

Permalink
fix(race): concurrent next calls with defer/stream (#2975)
Browse files Browse the repository at this point in the history
* fix(race): concurrent next calls

* refactor test

* use invariant

* disable eslint error

* fix
  • Loading branch information
yaacovCR authored Jun 2, 2021
1 parent cd69998 commit c393ab3
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 37 deletions.
68 changes: 68 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ async function complete(document: DocumentNode) {
return result;
}

async function completeAsync(document: DocumentNode, numCalls: number) {
const schema = new GraphQLSchema({ query });

const result = await execute({ schema, document, rootValue: {} });

invariant(isAsyncIterable(result));

const iterator = result[Symbol.asyncIterator]();

const promises = [];
for (let i = 0; i < numCalls; i++) {
promises.push(iterator.next());
}
return Promise.all(promises);
}

describe('Execute: stream directive', () => {
it('Can stream a list field', async () => {
const document = parse('{ scalarList @stream(initialCount: 1) }');
Expand Down Expand Up @@ -437,6 +453,58 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Can stream a field that returns an async iterable', async () => {
const document = parse(`
query {
asyncIterableList @stream(initialCount: 2) {
name
id
}
}
`);
const result = await completeAsync(document, 4);
expect(result).to.deep.equal([
{
done: false,
value: {
data: {
asyncIterableList: [
{
name: 'Luke',
id: '1',
},
{
name: 'Han',
id: '2',
},
],
},
hasNext: true,
},
},
{
done: false,
value: {
data: {
name: 'Leia',
id: '3',
},
path: ['asyncIterableList', 2],
hasNext: true,
},
},
{
done: false,
value: {
hasNext: false,
},
},
{
done: true,
value: undefined,
},
]);
});
it('Handles error thrown in async iterable before initialCount is reached', async () => {
const document = parse(`
query {
Expand Down
89 changes: 52 additions & 37 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1784,47 +1784,62 @@ export class Dispatcher {
done: false,
});
}
return new Promise<{
promise: Promise<IteratorResult<DispatcherResult, void>>;
}>((resolve) => {
return new Promise((resolve) => {
let resolved = false;
this._subsequentPayloads.forEach((promise) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
promise.then(() => {
// resolve with actual promise, not resolved value of promise so we can remove it from this._subsequentPayloads
resolve({ promise });
});
});
})
.then(({ promise }) => {
this._subsequentPayloads.splice(
this._subsequentPayloads.indexOf(promise),
1,
);
return promise;
})
.then(({ value, done }) => {
if (done && this._subsequentPayloads.length === 0) {
// async iterable resolver just finished and no more pending payloads
return {
value: {
hasNext: false,
},
done: false,
promise.then((payload) => {
if (resolved) {
return;
}

resolved = true;

if (this._subsequentPayloads.length === 0) {
// a different call to next has exhausted all payloads
resolve({ value: undefined, done: true });
return;
}

const index = this._subsequentPayloads.indexOf(promise);

if (index === -1) {
// a different call to next has consumed this payload
resolve(this._race());
return;
}

this._subsequentPayloads.splice(index, 1);

const { value, done } = payload;

if (done && this._subsequentPayloads.length === 0) {
// async iterable resolver just finished and no more pending payloads
resolve({
value: {
hasNext: false,
},
done: false,
});
return;
} else if (done) {
// async iterable resolver just finished but there are pending payloads
// return the next one
resolve(this._race());
return;
}

const returnValue: ExecutionPatchResult = {
...value,
hasNext: this._subsequentPayloads.length > 0,
};
} else if (done) {
// async iterable resolver just finished but there are pending payloads
// return the next one
return this._race();
}
const returnValue: ExecutionPatchResult = {
...value,
hasNext: this._subsequentPayloads.length > 0,
};
return {
value: returnValue,
done: false,
};
resolve({
value: returnValue,
done: false,
});
});
});
});
}

_next(): Promise<IteratorResult<AsyncExecutionResult, void>> {
Expand Down

0 comments on commit c393ab3

Please sign in to comment.