diff --git a/packages/batch-execute/package.json b/packages/batch-execute/package.json index f5f7b138319..2092f2ffd27 100644 --- a/packages/batch-execute/package.json +++ b/packages/batch-execute/package.json @@ -22,6 +22,7 @@ "input": "./src/index.ts" }, "dependencies": { + "@graphql-tools/pubsub": "^7.0.0", "@graphql-tools/utils": "^7.0.0", "dataloader": "2.0.0", "is-promise": "4.0.0", diff --git a/packages/batch-execute/src/splitResult.ts b/packages/batch-execute/src/splitResult.ts index d33e8d49140..94dfef32e2b 100644 --- a/packages/batch-execute/src/splitResult.ts +++ b/packages/batch-execute/src/splitResult.ts @@ -5,6 +5,7 @@ import { ExecutionResult, GraphQLError } from 'graphql'; import isPromise from 'is-promise'; import { AsyncExecutionResult, isAsyncIterable, relocatedError } from '@graphql-tools/utils'; +import { InMemoryChannel } from '@graphql-tools/pubsub'; import { parseKey } from './prefix'; @@ -16,8 +17,8 @@ export function splitResult( numResults: number ): Array< | ExecutionResult - | AsyncIterableIterator - | Promise> + | AsyncIterableIterator + | Promise> > { if (isPromise(mergedResult)) { const result = mergedResult.then(r => splitExecutionResultOrAsyncIterableIterator(r, numResults)); @@ -32,13 +33,31 @@ export function splitResult( return splitExecutionResultOrAsyncIterableIterator(mergedResult, numResults); } +async function iterate( + mergedResult: AsyncIterableIterator, + channel: InMemoryChannel +): Promise { + for await (const asyncResult of mergedResult) { + channel.publish(asyncResult); + } +} + export function splitExecutionResultOrAsyncIterableIterator( - mergedResult: ExecutionResult | AsyncIterableIterator, + mergedResult: ExecutionResult | AsyncIterableIterator, numResults: number -): Array> { +): Array> { if (isAsyncIterable(mergedResult)) { - // TODO: add implementation - return undefined; + const channel = new InMemoryChannel(); + + const asyncIterables: Array> = []; + for (let i = 0; i < numResults; i++) { + // TODO: add filter and map functionality + asyncIterables.push(channel.subscribe()); + } + + setImmediate(() => iterate(mergedResult, channel)); + + return asyncIterables; } return splitExecutionResult(mergedResult, numResults);