Skip to content

Commit

Permalink
[v1] close subscriptions on disposal and schema change with different…
Browse files Browse the repository at this point in the history
… codes (#7220)
  • Loading branch information
enisdenjo authored Jul 5, 2024
1 parent a2306d2 commit de7517e
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 161 deletions.
35 changes: 35 additions & 0 deletions .changeset/small-planets-push.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
'@graphql-mesh/serve-runtime': patch
---

Close subscriptions on disposal and schema change with different codes.

When the server gets disposed (on shutdown), all active subscriptions will complete emitting the following execution error:

```json
{
"errors": [
{
"extensions": {
"code": "SHUTTING_DOWN",
},
"message": "subscription has been closed because the server is shutting down",
},
],
}
```

However, when the server detects a schema change, all active subscriptions will complete emitting the following execution error:

```json
{
"errors": [
{
"extensions": {
"code": "SUBSCRIPTION_SCHEMA_RELOAD",
},
"message": "subscription has been closed due to a schema reload",
},
],
}
```
42 changes: 2 additions & 40 deletions packages/fusion/runtime/tests/polling.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { buildSchema, GraphQLSchema, parse } from 'graphql';
import { GraphQLSchema, parse } from 'graphql';
import { createSchema } from 'graphql-yoga';
import { composeSubgraphs, getUnifiedGraphGracefully } from '@graphql-mesh/fusion-composition';
import { createServeRuntime } from '@graphql-mesh/serve-runtime';
import { getUnifiedGraphGracefully } from '@graphql-mesh/fusion-composition';
import { createDefaultExecutor, type DisposableExecutor } from '@graphql-mesh/transport-common';
import { normalizedExecutor } from '@graphql-tools/executor';
import { isAsyncIterable } from '@graphql-tools/utils';
Expand Down Expand Up @@ -101,41 +100,4 @@ describe('Polling', () => {
// Check if transport executor is disposed on global shutdown
expect(disposeFn).toHaveBeenCalledTimes(3);
});

it('should invoke onSchemaChange hooks when schema changes', done => {
let onSchemaChangeCalls = 0;
const serve = createServeRuntime({
polling: 500,
supergraph() {
if (onSchemaChangeCalls > 0) {
// change schema after onSchemaChange was invoked
return /* GraphQL */ `
type Query {
hello: Int!
}
`;
}

return /* GraphQL */ `
type Query {
world: String!
}
`;
},
plugins: () => [
{
onSchemaChange() {
if (onSchemaChangeCalls > 0) {
// schema changed for the second time
done();
}
onSchemaChangeCalls++;
},
},
],
});

// trigger mesh
serve.fetch('http://mesh/graphql?query={__typename}');
});
});
1 change: 1 addition & 0 deletions packages/serve-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"graphql-yoga": "^5.6.0"
},
"devDependencies": {
"graphql-sse": "^2.5.3",
"html-minifier-terser": "7.2.0"
},
"publishConfig": {
Expand Down
43 changes: 4 additions & 39 deletions packages/serve-runtime/src/createServeRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import type {
MeshServePlugin,
} from './types.js';
import { useChangingSchema } from './useChangingSchema.js';
import { useCompleteSubscriptionsOnSchemaChange } from './useCompleteSubscriptionsOnSchemaChange.js';
import { useCompleteSubscriptionsOnUnifiedGraphDispose } from './useCompleteSubscriptionsOnUnifiedGraphDispose.js';

export function createServeRuntime<TContext extends Record<string, any> = Record<string, any>>(
config: MeshServeConfig<TContext> = {},
Expand Down Expand Up @@ -257,31 +259,6 @@ export function createServeRuntime<TContext extends Record<string, any> = Record
check: readinessChecker,
});

function handleSubscriptionTerminationOnUnifiedGraphChange(
result: AsyncIterableIteratorOrValue<ExecutionResult>,
setResult: (result: AsyncIterableIteratorOrValue<ExecutionResult>) => void,
) {
if (isAsyncIterable(result) && result.return) {
const subTerminateRepeater = new Repeater(function repeaterExecutor(_push, stop) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
stop.then(() => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.return!();
});
onUnifiedGraphDispose(() => {
stop(
createGraphQLError('subscription has been closed due to a schema reload', {
extensions: {
code: 'SUBSCRIPTION_SCHEMA_RELOAD',
},
}),
);
});
});
setResult(Repeater.race([result, subTerminateRepeater]));
}
}

const defaultMeshPlugin: MeshServePlugin = {
onFetch({ setFetchFn }) {
setFetchFn(fetchAPI.fetch);
Expand All @@ -307,20 +284,6 @@ export function createServeRuntime<TContext extends Record<string, any> = Record
}
}
},
onExecute() {
return {
onExecuteDone({ result, setResult }) {
handleSubscriptionTerminationOnUnifiedGraphChange(result, setResult);
},
};
},
onSubscribe() {
return {
onSubscribeResult({ result, setResult }) {
handleSubscriptionTerminationOnUnifiedGraphChange(result, setResult);
},
};
},
};

let graphiqlOptionsOrFactory: GraphiQLOptionsOrFactory<unknown> | false;
Expand Down Expand Up @@ -391,6 +354,8 @@ export function createServeRuntime<TContext extends Record<string, any> = Record
readinessCheckPlugin,
registryPlugin,
useChangingSchema(getSchema, cb => (schemaChanged = cb)),
useCompleteSubscriptionsOnUnifiedGraphDispose(onUnifiedGraphDispose),
useCompleteSubscriptionsOnSchemaChange(),
...(config.plugins?.(configContext) || []),
],
// @ts-expect-error PromiseLike is not compatible with Promise
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { createGraphQLError, isAsyncIterable, Repeater } from 'graphql-yoga';
import type { MeshServePlugin } from './types';

export function useCompleteSubscriptionsOnSchemaChange(): MeshServePlugin {
const activeSubs: (() => void)[] = [];
return {
onSchemaChange() {
while (activeSubs.length) {
activeSubs.pop()?.();
}
},
onSubscribe() {
return {
onSubscribeResult({ result, setResult }) {
if (isAsyncIterable(result)) {
setResult(
Repeater.race([
result,
new Repeater((_push, stop) => {
function complete() {
stop(
createGraphQLError('subscription has been closed due to a schema reload', {
extensions: {
code: 'SUBSCRIPTION_SCHEMA_RELOAD',
},
}),
);
}
activeSubs.push(complete);

// eslint-disable-next-line @typescript-eslint/no-floating-promises
stop.then(() => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.return?.();
activeSubs.splice(activeSubs.indexOf(complete), 1);
});
}),
]),
);
}
},
};
},
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { createGraphQLError, isAsyncIterable, Repeater } from 'graphql-yoga';
import type { MeshServePlugin } from './types';

export function useCompleteSubscriptionsOnUnifiedGraphDispose(
onDispose: (cb: () => void) => void,
): MeshServePlugin {
return {
onSubscribe() {
return {
onSubscribeResult({ result, setResult }) {
if (isAsyncIterable(result)) {
setResult(
Repeater.race([
result,
new Repeater((_push, stop) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
stop.then(() => result.return?.());
onDispose(() => {
stop(
createGraphQLError(
'subscription has been closed because the server is shutting down',
{
extensions: {
code: 'SHUTTING_DOWN',
},
},
),
);
});
}),
]),
);
}
},
};
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,5 @@ exports[`Serve Runtime Defaults falls back to "./supergraph.graphql" by default:
exports[`Serve Runtime Hive CDN respects env vars: hive-cdn 1`] = `
"type Query {
foo: String
}
type Subscription {
pull: String
}"
`;
Loading

0 comments on commit de7517e

Please sign in to comment.