Skip to content

Commit

Permalink
retire in-house pubsub
Browse files Browse the repository at this point in the history
we don't need a pubsub to subscribe only to the first published valuee, we can use  an expectant store that returns a promise when the store does not yet contain the value, where all the relevant promises resolve on a future set.

this change means we can retire the in-house pubsub.

repeaters probably make in-house pubsub unnecessary anyway, and were not yet used elsewhere in the codebase

also somewhat relevant, see repeaterjs/repeater#67 (comment)
  • Loading branch information
yaacovCR committed Apr 12, 2021
1 parent a800d28 commit 9eb08e2
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 219 deletions.
1 change: 0 additions & 1 deletion packages/batch-execute/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
},
"dependencies": {
"@graphql-tools/utils": "^7.7.0",
"@graphql-tools/pubsub": "^7.0.0",
"dataloader": "2.0.0",
"is-promise": "4.0.0",
"tslib": "~2.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import { Push, Repeater } from '@repeaterjs/repeater';

import { Splitter } from './types';
type Splitter<T> = (item: T) => [number, T];

export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, splitter: Splitter<IteratorResult<T>>) {
const iterator = asyncIterable[Symbol.asyncIterator]();
Expand Down
114 changes: 48 additions & 66 deletions packages/delegate/src/Receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import {

import DataLoader from 'dataloader';

import { Repeater } from '@repeaterjs/repeater';
import { Repeater, Stop } from '@repeaterjs/repeater';

import { AsyncExecutionResult } from '@graphql-tools/utils';
import { InMemoryPubSub } from '@graphql-tools/pubsub';

import { DelegationContext, ExternalObject } from './types';
import { getReceiver, getSubschema, getUnpathedErrors, mergeExternalObjects } from './externalObjects';
import { resolveExternalValue } from './resolveExternalValue';
import { externalValueFromResult, externalValueFromPatchResult } from './externalValues';
import { ExpectantStore } from './expectantStore';

export class Receiver {
private readonly asyncIterable: AsyncIterable<AsyncExecutionResult>;
Expand All @@ -31,10 +31,10 @@ export class Receiver {
private readonly asyncSelectionSets: Record<string, SelectionSetNode>;
private readonly resultTransformer: (originalResult: ExecutionResult) => any;
private readonly initialResultDepth: number;
private readonly pubsub: InMemoryPubSub<ExternalObject>;
private deferredPatches: Record<string, Array<ExecutionPatchResult>>;
private streamedPatches: Record<string, Record<number, Array<ExecutionPatchResult>>>;
private cache: Record<string, any>;
private cache: ExpectantStore<ExternalObject>;
private stoppers: Array<Stop>;
private loaders: Record<string, DataLoader<GraphQLResolveInfo, any>>;
private infos: Record<string, Record<string, GraphQLResolveInfo>>;

Expand All @@ -54,11 +54,11 @@ export class Receiver {

this.resultTransformer = resultTransformer;
this.initialResultDepth = info ? responsePathAsArray(info.path).length - 1 : 0;
this.pubsub = new InMemoryPubSub();

this.deferredPatches = Object.create(null);
this.streamedPatches = Object.create(null);
this.cache = Object.create(null);
this.cache = new ExpectantStore();
this.stoppers = [];
this.loaders = Object.create(null);
this.infos = Object.create(null);
}
Expand All @@ -67,7 +67,7 @@ export class Receiver {
const asyncIterator = this.asyncIterable[Symbol.asyncIterator]();
const payload = await asyncIterator.next();
const initialResult = externalValueFromResult(this.resultTransformer(payload.value), this.delegationContext, this);
this.cache[this.fieldName] = initialResult;
this.cache.set(this.fieldName, initialResult);

this._iterate();

Expand Down Expand Up @@ -110,73 +110,52 @@ export class Receiver {
this.onNewInfo(pathKey, combinedInfo);
}

const parent = this.cache[parentKey];
if (parent !== undefined) {
const data = parent[responseKey];
if (data !== undefined) {
const unpathedErrors = getUnpathedErrors(parent);
const subschema = getSubschema(parent, responseKey);
const receiver = getReceiver(parent, subschema);
this.onNewExternalValue(
pathKey,
resolveExternalValue(data, unpathedErrors, subschema, this.context, combinedInfo, receiver),
isCompositeType(getNamedType(combinedInfo.returnType))
? {
kind: Kind.SELECTION_SET,
selections: [].concat(...combinedInfo.fieldNodes.map(fieldNode => fieldNode.selectionSet.selections)),
}
: undefined
);
}
const parent = await this.cache.request(parentKey);

const data = parent[responseKey];
if (data !== undefined) {
const unpathedErrors = getUnpathedErrors(parent);
const subschema = getSubschema(parent, responseKey);
const receiver = getReceiver(parent, subschema);
this.onNewExternalValue(
pathKey,
resolveExternalValue(data, unpathedErrors, subschema, this.context, combinedInfo, receiver),
isCompositeType(getNamedType(combinedInfo.returnType))
? {
kind: Kind.SELECTION_SET,
selections: [].concat(...combinedInfo.fieldNodes.map(fieldNode => fieldNode.selectionSet.selections)),
}
: undefined
);
}

if (fieldShouldStream(combinedInfo)) {
return infos.map(
() =>
new Repeater(async (push, stop) => {
let initialValues: Array<any> = this.cache[pathKey];
if (initialValues !== undefined) {
initialValues.forEach(async value => push(value));
} else {
const payload = await this.pubsub.subscribe(pathKey).next();
initialValues = payload.value;
if (initialValues === undefined) {
return;
}
}
const initialValues = ((await this.cache.request(pathKey)) as unknown) as Array<ExternalObject>;
initialValues.forEach(async value => push(value));

let index = initialValues.length;
while (true) {
const listMemberKey = `${pathKey}.${index++}`;

let listMember = this.cache[listMemberKey];
if (listMember !== undefined) {
await push(listMember);
continue;
}

const listMemberPayload = await this.pubsub.subscribe(listMemberKey).next();
listMember = listMemberPayload.value;
if (listMember === undefined) {
break;
}

await push(listMember);
}

stop();
let stopped = false;
stop.then(() => (stopped = true));

this.stoppers.push(stop);

const next = () => this.cache.request(`${pathKey}.${index++}`);

/* eslint-disable no-unmodified-loop-condition */
while (!stopped) {
await push(next());
}
/* eslint-disable no-unmodified-loop-condition */
})
);
}

const externalValue = this.cache[pathKey];
if (externalValue !== undefined) {
return new Array(infos.length).fill(externalValue);
}

const payload = await this.pubsub.subscribe(pathKey).next();

return new Array(infos.length).fill(payload.value);
const externalValue = await this.cache.request(pathKey);
return new Array(infos.length).fill(externalValue);
}

private async _iterate(): Promise<void> {
Expand Down Expand Up @@ -266,13 +245,15 @@ export class Receiver {
}

setTimeout(() => {
this.pubsub.close();
this.cache.clear();
this.stoppers.forEach(stop => stop());
});
}

private onNewExternalValue(pathKey: string, newExternalValue: any, selectionSet: SelectionSetNode): void {
const externalValue = this.cache[pathKey];
this.cache[pathKey] =
const externalValue = this.cache.get(pathKey);
this.cache.set(
pathKey,
externalValue === undefined
? newExternalValue
: mergeExternalObjects(
Expand All @@ -282,7 +263,8 @@ export class Receiver {
externalValue,
[newExternalValue],
[selectionSet]
);
)
);

const infosByParentKey = this.infos[pathKey];
if (infosByParentKey !== undefined) {
Expand All @@ -309,7 +291,7 @@ export class Receiver {
});
}

this.pubsub.publish(pathKey, newExternalValue);
this.cache.set(pathKey, newExternalValue);
}

private onNewInfo(pathKey: string, info: GraphQLResolveInfo): void {
Expand Down
53 changes: 53 additions & 0 deletions packages/delegate/src/expectantStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
interface Settler<T> {
resolve(value: T): void;
reject(reason?: any): void;
}

export class ExpectantStore<T> {
protected settlers: Record<string, Set<Settler<T>>> = {};
protected cache: Record<string, T> = {};

set(key: string, value: T): void {
this.cache[key] = value;
const settlers = this.settlers[key];
if (settlers != null) {
for (const { resolve } of settlers) {
resolve(value);
}
settlers.clear();
delete this.settlers[key];
}
}

get(key: string): T {
return this.cache[key];
}

request(key: string): Promise<T> | T {
const value = this.cache[key];

if (value !== undefined) {
return value;
}

let settlers = this.settlers[key];
if (settlers != null) {
settlers = this.settlers[key] = new Set();
}

return new Promise((resolve, reject) => {
settlers.add({ resolve, reject });
});
}

clear(reason?: any): void {
for (const settlers of Object.values(this.settlers)) {
for (const { reject } of settlers) {
reject(reason);
}
}

this.settlers = {};
this.cache = {};
}
}
31 changes: 0 additions & 31 deletions packages/pubsub/package.json

This file was deleted.

48 changes: 0 additions & 48 deletions packages/pubsub/src/in-memory-channel.ts

This file was deleted.

Loading

0 comments on commit 9eb08e2

Please sign in to comment.