Skip to content

Commit

Permalink
[v1] fix(runtime): cleanup transport executors per schema change (#7215)
Browse files Browse the repository at this point in the history
* fix(runtime): cleanup transport executors per schema change

* Fix

* Dispose transports on global shutdow

* Subscription cleanup

* Handle asynciterable results in execute as well

* Fix duplicate call
  • Loading branch information
ardatan authored Jul 5, 2024
1 parent f1591c2 commit eefbfbe
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 38 deletions.
7 changes: 7 additions & 0 deletions .changeset/nice-kangaroos-laugh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-mesh/fusion-runtime': patch
'@graphql-mesh/serve-runtime': patch
---

Cleanup created transport executors per schema change
Previously they were cleaned up only on server close, which could lead to memory leaks in case of schema changes.
16 changes: 14 additions & 2 deletions packages/fusion/runtime/src/unifiedGraphManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ export class UnifiedGraphManager<TContext> {
private onSubgraphExecuteHooks: OnSubgraphExecuteHook[];
private currentTimeout: NodeJS.Timeout | undefined;
private inContextSDK;
private initialUnifiedGraph$: MaybePromise<void>;
private initialUnifiedGraph$: MaybePromise<true>;
private disposableStack = new AsyncDisposableStack();
private _transportEntryMap: Record<string, TransportEntry>;
private _transportExecutorStack: AsyncDisposableStack;
constructor(private opts: UnifiedGraphManagerOptions<TContext>) {
this.handleUnifiedGraph = opts.handleUnifiedGraph || handleFederationSupergraph;
this.onSubgraphExecuteHooks = opts?.onSubgraphExecuteHooks || [];
Expand All @@ -82,9 +83,14 @@ export class UnifiedGraphManager<TContext> {
this.inContextSDK = undefined;
this.initialUnifiedGraph$ = undefined;
this.pausePolling();
return this._transportExecutorStack?.disposeAsync();
});
}

public onUnifiedGraphDispose(fn: () => MaybePromise<void>) {
this._transportExecutorStack?.defer(fn);
}

private pausePolling() {
if (this.currentTimeout) {
clearTimeout(this.currentTimeout);
Expand Down Expand Up @@ -127,6 +133,11 @@ export class UnifiedGraphManager<TContext> {
if (this.lastLoadedUnifiedGraph != null) {
this.opts.transportBaseContext?.logger?.debug('Unified Graph changed, updating...');
}
let cleanupJob$: Promise<true>;
if (this._transportExecutorStack) {
cleanupJob$ = this._transportExecutorStack.disposeAsync().then(() => true);
}
this._transportExecutorStack = new AsyncDisposableStack();
this.lastLoadedUnifiedGraph ||= loadedUnifiedGraph;
this.lastLoadedUnifiedGraph = loadedUnifiedGraph;
this.unifiedGraph = ensureSchema(loadedUnifiedGraph);
Expand Down Expand Up @@ -156,7 +167,7 @@ export class UnifiedGraphManager<TContext> {
}
return subgraph.schema;
},
disposableStack: this.disposableStack,
transportExecutorStack: this._transportExecutorStack,
});
if (this.opts.additionalResolvers || additionalResolvers.length) {
this.inContextSDK = getInContextSDK(
Expand All @@ -169,6 +180,7 @@ export class UnifiedGraphManager<TContext> {
}
this.continuePolling();
this._transportEntryMap = transportEntryMap;
return cleanupJob$ || true;
},
);
}
Expand Down
10 changes: 5 additions & 5 deletions packages/fusion/runtime/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ export function createTransportGetter(transports: TransportsOption): TransportGe
export function getTransportExecutor(
transportGetter: TransportGetter,
transportContext: TransportExecutorFactoryOpts,
disposableStack: AsyncDisposableStack,
transportExecutorStack: AsyncDisposableStack,
): MaybePromise<Executor> {
const transportKind = transportContext.transportEntry?.kind || '';
const subgraphName = transportContext.subgraphName || '';
transportContext.logger?.info(`Loading transport ${transportKind} for subgraph ${subgraphName}`);
return mapMaybePromise(transportGetter(transportKind), transport =>
mapMaybePromise(transport.getSubgraphExecutor(transportContext), executor => {
if (isDisposable(executor)) {
disposableStack.use(executor);
transportExecutorStack.use(executor);
}
return executor;
}),
Expand All @@ -91,15 +91,15 @@ export function getOnSubgraphExecute({
transportBaseContext,
transportEntryMap,
getSubgraphSchema,
disposableStack,
transportExecutorStack,
transports = createDefaultTransportsOption(transportBaseContext?.logger),
}: {
onSubgraphExecuteHooks: OnSubgraphExecuteHook[];
transports?: TransportsOption;
transportBaseContext?: TransportBaseContext;
transportEntryMap?: Record<string, TransportEntry>;
getSubgraphSchema(subgraphName: string): GraphQLSchema;
disposableStack: AsyncDisposableStack;
transportExecutorStack: AsyncDisposableStack;
}) {
const subgraphExecutorMap = new Map<string, Executor>();
const transportGetter = createTransportGetter(transports);
Expand Down Expand Up @@ -135,7 +135,7 @@ export function getOnSubgraphExecute({
},
subgraphName,
},
disposableStack,
transportExecutorStack,
),
executor_ => {
// Wraps the transport executor with hooks
Expand Down
94 changes: 77 additions & 17 deletions packages/fusion/runtime/tests/polling.test.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,103 @@
import { buildSchema } from 'graphql';
import { buildSchema, GraphQLSchema, parse } from 'graphql';
import { createSchema } from 'graphql-yoga';
import { composeSubgraphs, getUnifiedGraphGracefully } from '@graphql-mesh/fusion-composition';
import { createDefaultExecutor, type DisposableExecutor } from '@graphql-mesh/transport-common';
import { normalizedExecutor } from '@graphql-tools/executor';
import { isAsyncIterable } from '@graphql-tools/utils';
import { UnifiedGraphManager } from '../src/unifiedGraphManager';

describe('Polling', () => {
it('polls the schema in a certain interval', async () => {
jest.useFakeTimers();
const pollingInterval = 35_000;
const unifiedGraphFetcher = () =>
getUnifiedGraphGracefully([
{
name: 'Test',
schema: buildSchema(/* GraphQL */ `
let schema: GraphQLSchema;
const unifiedGraphFetcher = () => {
const time = new Date().toISOString();
schema = createSchema({
typeDefs: /* GraphQL */ `
"""
Fetched on ${new Date().toISOString()}
Fetched on ${time}
"""
type Query {
test: String
time: String
}
`),
`,
resolvers: {
Query: {
time() {
return time;
},
},
},
});
return getUnifiedGraphGracefully([
{
name: 'Test',
schema,
},
]);
await using manager = new UnifiedGraphManager({
};
const disposeFn = jest.fn();
const manager = new UnifiedGraphManager({
getUnifiedGraph: unifiedGraphFetcher,
polling: pollingInterval,
transports() {
return {
getSubgraphExecutor() {
const executor: DisposableExecutor = createDefaultExecutor(schema);
executor[Symbol.asyncDispose] = disposeFn;
return executor;
},
};
},
});
async function getFetchedTime() {
async function getFetchedTimeOnComment() {
const schema = await manager.getUnifiedGraph();
const queryType = schema.getQueryType();
const lastFetchedDateStr = queryType.description.match(/Fetched on (.*)/)[1];
const lastFetchedDate = new Date(lastFetchedDateStr);
return lastFetchedDate;
}
const firstDate = await getFetchedTime();
async function getFetchedTimeFromResolvers() {
const schema = await manager.getUnifiedGraph();
const result = await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
query {
time
}
`),
});
if (isAsyncIterable(result)) {
throw new Error('Unexpected async iterable');
}
return new Date(result.data.time);
}
async function compareTimes() {
const timeFromComment = await getFetchedTimeOnComment();
const timeFromResolvers = await getFetchedTimeFromResolvers();
expect(timeFromComment).toEqual(timeFromResolvers);
}
await compareTimes();
const firstDate = await getFetchedTimeOnComment();
jest.advanceTimersByTime(pollingInterval);
const secondDate = await getFetchedTime();
expect(secondDate.getTime() - firstDate.getTime()).toBeGreaterThanOrEqual(pollingInterval);
await compareTimes();
const secondDate = await getFetchedTimeOnComment();
const diffBetweenFirstAndSecond = secondDate.getTime() - firstDate.getTime();
expect(diffBetweenFirstAndSecond).toBeGreaterThanOrEqual(pollingInterval);
jest.advanceTimersByTime(pollingInterval);
const thirdDate = await getFetchedTime();
expect(thirdDate.getTime() - secondDate.getTime()).toBeGreaterThanOrEqual(pollingInterval);
expect(thirdDate.getTime() - firstDate.getTime()).toBeGreaterThanOrEqual(pollingInterval * 2);
await compareTimes();
const thirdDate = await getFetchedTimeOnComment();
const diffBetweenSecondAndThird = thirdDate.getTime() - secondDate.getTime();
expect(diffBetweenSecondAndThird).toBeGreaterThanOrEqual(pollingInterval);
const diffBetweenFirstAndThird = thirdDate.getTime() - firstDate.getTime();
expect(diffBetweenFirstAndThird).toBeGreaterThanOrEqual(pollingInterval * 2);

// Check if transport executor is disposed per schema change
expect(disposeFn).toHaveBeenCalledTimes(2);

await manager[Symbol.asyncDispose]();
// Check if transport executor is disposed on global shutdown
expect(disposeFn).toHaveBeenCalledTimes(3);
});
});
47 changes: 46 additions & 1 deletion packages/serve-runtime/src/createServeRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import AsyncDisposableStack from 'disposablestack/AsyncDisposableStack';
import type { GraphQLSchema } from 'graphql';
import { parse } from 'graphql';
import {
createGraphQLError,
createYoga,
isAsyncIterable,
Repeater,
useReadinessCheck,
type FetchAPI,
type LandingPageRenderer,
type Plugin,
type YogaServerInstance,
} from 'graphql-yoga';
import type { GraphiQLOptionsOrFactory } from 'graphql-yoga/typings/plugins/use-graphiql.js';
import type { AsyncIterableIteratorOrValue } from '@envelop/core';
import { createSupergraphSDLFetcher } from '@graphql-hive/client';
import { process } from '@graphql-mesh/cross-helpers';
import type {
Expand All @@ -36,7 +39,7 @@ import {
wrapFetchWithHooks,
} from '@graphql-mesh/utils';
import { useExecutor } from '@graphql-tools/executor-yoga';
import type { MaybePromise } from '@graphql-tools/utils';
import type { ExecutionResult, MaybePromise } from '@graphql-tools/utils';
import { getProxyExecutor } from './getProxyExecutor.js';
import { handleUnifiedGraphConfig } from './handleUnifiedGraphConfig.js';
import landingPageHtml from './landing-page-html.js';
Expand Down Expand Up @@ -82,6 +85,7 @@ export function createServeRuntime<TContext extends Record<string, any> = Record

let unifiedGraph: GraphQLSchema;
let schemaInvalidator: () => void;
let onUnifiedGraphDispose: (callback: () => MaybePromise<void>) => void;
let schemaFetcher: () => MaybePromise<GraphQLSchema>;
let contextBuilder: <T>(context: T) => MaybePromise<T>;
let readinessChecker: () => MaybePromise<boolean>;
Expand Down Expand Up @@ -119,6 +123,7 @@ export function createServeRuntime<TContext extends Record<string, any> = Record
const endpoint = config.proxy.endpoint || '#';
return `<section class="supergraph-information"><h3>Proxy (<a href="${endpoint}">${endpoint}</a>): ${unifiedGraph ? 'Loaded ✅' : 'Not yet ❌'}</h3></section>`;
};
onUnifiedGraphDispose = callback => disposableStack.defer(callback);
} else {
let unifiedGraphFetcher: UnifiedGraphManagerOptions<unknown>['getUnifiedGraph'];

Expand Down Expand Up @@ -183,6 +188,7 @@ export function createServeRuntime<TContext extends Record<string, any> = Record
onDelegateHooks,
onSubgraphExecuteHooks,
});
onUnifiedGraphDispose = callback => unifiedGraphManager.onUnifiedGraphDispose(callback);
schemaFetcher = () => unifiedGraphManager.getUnifiedGraph();
readinessChecker = () =>
mapMaybePromise(unifiedGraphManager.getUnifiedGraph(), schema => !!schema);
Expand Down Expand Up @@ -242,6 +248,31 @@ export function createServeRuntime<TContext extends Record<string, any> = Record
check: readinessChecker,
});

function handleSubscriptionTerminationOnUnifiedGraphChange(
result: AsyncIterableIteratorOrValue<ExecutionResult>,
setResult: (result: AsyncIterableIteratorOrValue<ExecutionResult>) => void,
) {
if (isAsyncIterable(result) && result.return) {
const subTerminateRepeater = new Repeater(function repeaterExecutor(_push, stop) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
stop.then(() => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.return!();
});
onUnifiedGraphDispose(() => {
stop(
createGraphQLError('subscription has been closed due to a schema reload', {
extensions: {
code: 'SUBSCRIPTION_SCHEMA_RELOAD',
},
}),
);
});
});
setResult(Repeater.race([result, subTerminateRepeater]));
}
}

const defaultMeshPlugin: MeshServePlugin = {
onFetch({ setFetchFn }) {
setFetchFn(fetchAPI.fetch);
Expand All @@ -267,6 +298,20 @@ export function createServeRuntime<TContext extends Record<string, any> = Record
}
}
},
onExecute() {
return {
onExecuteDone({ result, setResult }) {
handleSubscriptionTerminationOnUnifiedGraphChange(result, setResult);
},
};
},
onSubscribe() {
return {
onSubscribeResult({ result, setResult }) {
handleSubscriptionTerminationOnUnifiedGraphChange(result, setResult);
},
};
},
};

let graphiqlOptionsOrFactory: GraphiQLOptionsOrFactory<unknown> | false;
Expand Down
2 changes: 1 addition & 1 deletion packages/serve-runtime/src/getProxyExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export function getProxyExecutor<TContext>({
}),
transportBaseContext: configContext,
getSubgraphSchema: getSchema,
disposableStack,
transportExecutorStack: disposableStack,
});
return function proxyExecutor(executionRequest) {
return onSubgraphExecute(subgraphName, executionRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ exports[`Serve Runtime Defaults falls back to "./supergraph.graphql" by default:
exports[`Serve Runtime Hive CDN respects env vars: hive-cdn 1`] = `
"type Query {
foo: String
}
type Subscription {
pull: String
}"
`;
Loading

0 comments on commit eefbfbe

Please sign in to comment.