diff --git a/.changeset/fifty-carrots-pretend.md b/.changeset/fifty-carrots-pretend.md
new file mode 100644
index 00000000000..5001357f28e
--- /dev/null
+++ b/.changeset/fifty-carrots-pretend.md
@@ -0,0 +1,11 @@
+---
+'@graphql-tools/url-loader': major
+---
+
+BREAKING CHANGE
+- Remove `handleSDLAsync` and `handleSDLSync`; use `handleSDL` instead
+- Remove `useSSEForSubscription` and `useWebSocketLegacyProtocol`; use `subscriptionProtocol` instead
+- If introspection source is different than endpoint, use `endpoint` for remote execution source
+- Default HTTP Executor is renamed to `buildHTTPExecutor` with a new signature
+- `build*Subscriber` methods are renamed to `buildWSLegacyExecutor`, `buildWSExecutor` and `buildSSEExecutor` with new signatures
+- `getFetch` no longer takes `async` flag
diff --git a/packages/loaders/url/package.json b/packages/loaders/url/package.json
index f45850cc44c..20b3bbd6fb2 100644
--- a/packages/loaders/url/package.json
+++ b/packages/loaders/url/package.json
@@ -46,6 +46,7 @@
"@graphql-tools/utils": "^7.9.0",
"@graphql-tools/wrap": "^7.0.4",
"@microsoft/fetch-event-source": "2.0.1",
+ "@n1ru4l/graphql-live-query": "0.7.1",
"@types/websocket": "1.0.2",
"abort-controller": "3.0.0",
"cross-fetch": "3.1.4",
@@ -60,7 +61,8 @@
"sync-fetch": "0.3.0",
"tslib": "~2.3.0",
"valid-url": "1.0.9",
- "ws": "7.5.1"
+ "ws": "7.5.1",
+ "value-or-promise": "1.0.10"
},
"publishConfig": {
"access": "public",
diff --git a/packages/loaders/url/src/index.ts b/packages/loaders/url/src/index.ts
index 6424b0bd23c..ef3cc4dd3ab 100644
--- a/packages/loaders/url/src/index.ts
+++ b/packages/loaders/url/src/index.ts
@@ -1,6 +1,6 @@
/* eslint-disable no-case-declarations */
///
-import { print, IntrospectionOptions, Kind, GraphQLError } from 'graphql';
+import { print, IntrospectionOptions, GraphQLError, buildASTSchema, buildSchema, getOperationAST } from 'graphql';
import {
AsyncExecutor,
@@ -16,11 +16,9 @@ import {
mapAsyncIterator,
withCancel,
parseGraphQLSDL,
- Maybe,
} from '@graphql-tools/utils';
import { isWebUri } from 'valid-url';
import { fetch as crossFetch } from 'cross-fetch';
-import { SubschemaConfig } from '@graphql-tools/delegate';
import { introspectSchema, wrapSchema } from '@graphql-tools/wrap';
import { ClientOptions, createClient } from 'graphql-ws';
import WebSocket from 'isomorphic-ws';
@@ -33,6 +31,8 @@ import { ConnectionParamsOptions, SubscriptionClient as LegacySubscriptionClient
import AbortController from 'abort-controller';
import { meros } from 'meros';
import _ from 'lodash';
+import { ValueOrPromise } from 'value-or-promise';
+import { isLiveQueryOperationDefinitionNode } from '@n1ru4l/graphql-live-query';
export type AsyncFetchFn = typeof import('cross-fetch').fetch;
export type SyncFetchFn = (input: RequestInfo, init?: RequestInit) => SyncResponse;
@@ -42,15 +42,6 @@ export type SyncResponse = Omit & {
};
export type FetchFn = AsyncFetchFn | SyncFetchFn;
-type BuildExecutorOptions = {
- pointer: string;
- fetch: TFetchFn;
- extraHeaders?: HeadersConfig;
- defaultMethod: 'GET' | 'POST';
- useGETForQueries?: Maybe;
- multipart?: Maybe;
-};
-
// TODO: Should the types here be changed to T extends Record ?
export type AsyncImportFn = (moduleName: string) => PromiseLike;
// TODO: Should the types here be changed to T extends Record ?
@@ -80,6 +71,18 @@ interface ExecutionExtensions {
headers?: HeadersConfig;
}
+export enum SubscriptionProtocol {
+ WS = 'WS',
+ /**
+ * Use legacy web socket protocol `graphql-ws` instead of the more current standard `graphql-transport-ws`
+ */
+ LEGACY_WS = 'LEGACY_WS',
+ /**
+ * Use SSE for subscription instead of WebSocket
+ */
+ SSE = 'SSE',
+}
+
/**
* Additional options for loading from a URL
*/
@@ -110,14 +113,6 @@ export interface LoadFromUrlOptions extends SingleFileOptions, Partial {
return !!isWebUri(pointer);
}
- async createFormDataFromVariables({
+ createFormDataFromVariables({
query,
variables,
operationName,
@@ -178,7 +181,7 @@ export class UrlLoader implements DocumentLoader {
prev[currIndex] = curr;
return prev;
}, {});
- const uploads: any = new Map(Array.from(files.keys()).map((u, i) => [i, u]));
+ const uploads: Map = new Map(Array.from(files.keys()).map((u, i) => [i, u]));
const form = new FormData();
form.append(
'operations',
@@ -190,32 +193,35 @@ export class UrlLoader implements DocumentLoader {
})
);
form.append('map', JSON.stringify(map));
- await Promise.all(
- Array.from(uploads.entries()).map(async (params: unknown) => {
- let [i, u] = params as any;
- if (isPromise(u)) {
- u = await u;
- }
- if (u?.promise) {
- const upload = await u.promise;
- const stream = upload.createReadStream();
- form.append(i.toString(), stream, {
- filename: upload.filename,
- contentType: upload.mimetype,
- } as any);
- } else {
- form.append(
- i.toString(),
- u as any,
- {
- filename: 'name' in u ? u['name'] : i,
- contentType: u.type,
- } as any
- );
- }
- })
- );
- return form;
+ return ValueOrPromise.all(
+ Array.from(uploads.entries()).map(params =>
+ new ValueOrPromise(() => {
+ const [i, u$] = params as any;
+ return new ValueOrPromise(() => u$).then(u => [i, u]).resolve();
+ }).then(([i, u]) => {
+ if (u?.promise) {
+ return u.promise.then((upload: any) => {
+ const stream = upload.createReadStream();
+ form.append(i.toString(), stream, {
+ filename: upload.filename,
+ contentType: upload.mimetype,
+ } as any);
+ });
+ } else {
+ form.append(
+ i.toString(),
+ u as any,
+ {
+ filename: 'name' in u ? u['name'] : i,
+ contentType: u.type,
+ } as any
+ );
+ }
+ })
+ )
+ )
+ .then(() => form)
+ .resolve();
}
prepareGETUrl({
@@ -256,17 +262,25 @@ export class UrlLoader implements DocumentLoader {
return finalUrl;
}
- buildExecutor(options: BuildExecutorOptions): SyncExecutor;
- buildExecutor(options: BuildExecutorOptions): AsyncExecutor;
- buildExecutor({
- pointer,
- fetch,
- extraHeaders,
- defaultMethod,
- useGETForQueries,
- multipart,
- }: BuildExecutorOptions): Executor {
- const HTTP_URL = switchProtocols(pointer, {
+ buildHTTPExecutor(
+ endpoint: string,
+ fetch: SyncFetchFn,
+ options?: LoadFromUrlOptions
+ ): SyncExecutor;
+
+ buildHTTPExecutor(
+ endpoint: string,
+ fetch: AsyncFetchFn,
+ options?: LoadFromUrlOptions
+ ): AsyncExecutor;
+
+ buildHTTPExecutor(
+ endpoint: string,
+ fetch: FetchFn,
+ options?: LoadFromUrlOptions
+ ): Executor {
+ const defaultMethod = this.getDefaultMethodFromOptions(options?.method, 'POST');
+ const HTTP_URL = switchProtocols(endpoint, {
wss: 'https',
ws: 'http',
});
@@ -278,104 +292,112 @@ export class UrlLoader implements DocumentLoader {
}: ExecutionParams) => {
const controller = new AbortController();
let method = defaultMethod;
- if (useGETForQueries) {
- method = 'GET';
- for (const definition of document.definitions) {
- if (definition.kind === Kind.OPERATION_DEFINITION) {
- if (definition.operation !== 'query') {
- method = defaultMethod;
- }
- }
+ if (options?.useGETForQueries) {
+ const operationAst = getOperationAST(document, operationName);
+ if (operationAst?.operation === 'query') {
+ method = 'GET';
+ } else {
+ method = defaultMethod;
}
}
- const headers = Object.assign({}, extraHeaders, extensions?.headers || {});
+ const headers = Object.assign({}, options?.headers, extensions?.headers || {});
- let fetchResult: SyncResponse | Promise;
- const query = print(document);
- switch (method) {
- case 'GET':
- const finalUrl = this.prepareGETUrl({ baseUrl: pointer, query, variables, operationName, extensions });
- fetchResult = fetch(finalUrl, {
- method: 'GET',
- credentials: 'include',
- headers: {
- accept: 'application/json',
- ...headers,
- },
- });
- break;
- case 'POST':
- if (multipart) {
- fetchResult = this.createFormDataFromVariables({ query, variables, operationName, extensions }).then(form =>
- (fetch as AsyncFetchFn)(HTTP_URL, {
+ return new ValueOrPromise(() => {
+ const query = print(document);
+ switch (method) {
+ case 'GET':
+ const finalUrl = this.prepareGETUrl({ baseUrl: endpoint, query, variables, operationName, extensions });
+ return fetch(finalUrl, {
+ method: 'GET',
+ credentials: 'include',
+ headers: {
+ accept: 'application/json',
+ ...headers,
+ },
+ });
+ case 'POST':
+ if (options?.multipart) {
+ return new ValueOrPromise(() =>
+ this.createFormDataFromVariables({ query, variables, operationName, extensions })
+ )
+ .then(
+ form =>
+ fetch(HTTP_URL, {
+ method: 'POST',
+ credentials: 'include',
+ body: form as any,
+ headers: {
+ accept: 'application/json',
+ ...headers,
+ },
+ signal: controller.signal,
+ }) as any
+ )
+ .resolve();
+ } else {
+ return fetch(HTTP_URL, {
method: 'POST',
credentials: 'include',
- body: form as any,
+ body: JSON.stringify({
+ query,
+ variables,
+ operationName,
+ extensions,
+ }),
headers: {
- accept: 'application/json',
+ accept: 'application/json, multipart/mixed',
+ 'content-type': 'application/json',
...headers,
},
signal: controller.signal,
- })
- );
- } else {
- fetchResult = fetch(HTTP_URL, {
- method: 'POST',
- credentials: 'include',
- body: JSON.stringify({
- query,
- variables,
- operationName,
- extensions,
- }),
- headers: {
- accept: 'application/json, multipart/mixed',
- 'content-type': 'application/json',
- ...headers,
- },
- signal: controller.signal,
- });
- }
- break;
- }
- if (isPromise(fetchResult)) {
- return fetchResult.then(async res => {
+ });
+ }
+ }
+ })
+ .then((fetchResult: Response) => {
const response: ExecutionResult = {};
- const maybeStream = await meros(res);
- if (isAsyncIterable(maybeStream)) {
- return withCancel(
- mapAsyncIterator(maybeStream, part => {
- if (part.json) {
- const chunk = part.body;
- if (chunk.path) {
- if (chunk.data) {
- const path: Array = ['data'];
- _.merge(response, _.set({}, path.concat(chunk.path), chunk.data));
- }
-
- if (chunk.errors) {
- response.errors = (response.errors || []).concat(chunk.errors);
- }
- } else {
- if (chunk.data) {
- response.data = chunk.data;
+ const contentType = fetchResult.headers.get
+ ? fetchResult.headers.get('content-type')
+ : fetchResult['content-type'];
+ if (contentType?.includes('multipart/mixed')) {
+ return meros(fetchResult).then(maybeStream => {
+ if (isAsyncIterable(maybeStream)) {
+ return withCancel(
+ mapAsyncIterator(maybeStream, part => {
+ if (part.json) {
+ const chunk = part.body;
+ if (chunk.path) {
+ if (chunk.data) {
+ const path: Array = ['data'];
+ _.merge(response, _.set({}, path.concat(chunk.path), chunk.data));
+ }
+
+ if (chunk.errors) {
+ response.errors = (response.errors || []).concat(chunk.errors);
+ }
+ } else {
+ if (chunk.data) {
+ response.data = chunk.data;
+ }
+ if (chunk.errors) {
+ response.errors = chunk.errors;
+ }
+ }
+ return response;
}
- if (chunk.errors) {
- response.errors = chunk.errors;
- }
- }
- return response;
- }
- }),
- () => controller.abort()
- );
- } else {
- return maybeStream.json();
+ }),
+ () => controller.abort()
+ );
+ } else {
+ return maybeStream.json();
+ }
+ });
}
- });
- }
- return fetchResult.json();
+
+ return fetchResult.json();
+ })
+ .resolve();
};
return executor;
@@ -447,18 +469,17 @@ export class UrlLoader implements DocumentLoader {
}
buildSSEExecutor(
- pointer: string,
- extraHeaders: HeadersConfig | undefined,
+ endpoint: string,
fetch: AsyncFetchFn,
- options: Maybe
+ options?: Omit
): AsyncExecutor {
return async ({ document, variables, extensions }) => {
const controller = new AbortController();
const query = print(document);
- const finalUrl = this.prepareGETUrl({ baseUrl: pointer, query, variables });
+ const finalUrl = this.prepareGETUrl({ baseUrl: endpoint, query, variables });
return observableToAsyncIterable({
subscribe: observer => {
- const headers = Object.assign({}, extraHeaders || {}, extensions?.headers || {});
+ const headers = Object.assign({}, options?.headers || {}, extensions?.headers || {});
fetchEventSource(finalUrl, {
credentials: 'include',
headers,
@@ -489,7 +510,7 @@ export class UrlLoader implements DocumentLoader {
},
fetch,
signal: controller.signal,
- ...options,
+ ...(options?.eventSourceOptions || {}),
});
return {
unsubscribe: () => controller.abort(),
@@ -499,18 +520,13 @@ export class UrlLoader implements DocumentLoader {
};
}
- getFetch(
- customFetch: LoadFromUrlOptions['customFetch'],
- importFn: AsyncImportFn,
- async: true
- ): PromiseLike;
+ getFetch(customFetch: LoadFromUrlOptions['customFetch'], importFn: AsyncImportFn): PromiseLike;
- getFetch(customFetch: LoadFromUrlOptions['customFetch'], importFn: SyncImportFn, async: false): SyncFetchFn;
+ getFetch(customFetch: LoadFromUrlOptions['customFetch'], importFn: SyncImportFn): SyncFetchFn;
getFetch(
customFetch: LoadFromUrlOptions['customFetch'],
- importFn: SyncImportFn | AsyncImportFn,
- async: boolean
+ importFn: SyncImportFn | AsyncImportFn
): SyncFetchFn | PromiseLike {
if (customFetch) {
if (typeof customFetch === 'string') {
@@ -525,7 +541,7 @@ export class UrlLoader implements DocumentLoader {
return customFetch as any;
}
}
- return async ? (typeof fetch === 'undefined' ? crossFetch : fetch) : syncFetch;
+ return importFn === asyncImport ? (typeof fetch === 'undefined' ? crossFetch : fetch) : syncFetch;
}
private getDefaultMethodFromOptions(method: LoadFromUrlOptions['method'], defaultMethod: 'GET' | 'POST') {
@@ -535,13 +551,13 @@ export class UrlLoader implements DocumentLoader {
return defaultMethod;
}
- getWebSocketImpl(options: LoadFromUrlOptions, importFn: AsyncImportFn): PromiseLike;
+ getWebSocketImpl(importFn: AsyncImportFn, options?: LoadFromUrlOptions): PromiseLike;
- getWebSocketImpl(options: LoadFromUrlOptions, importFn: SyncImportFn): typeof WebSocket;
+ getWebSocketImpl(importFn: SyncImportFn, options?: LoadFromUrlOptions): typeof WebSocket;
getWebSocketImpl(
- options: LoadFromUrlOptions,
- importFn: SyncImportFn | AsyncImportFn
+ importFn: SyncImportFn | AsyncImportFn,
+ options?: LoadFromUrlOptions
): typeof WebSocket | PromiseLike {
if (typeof options?.webSocketImpl === 'string') {
const [moduleName, webSocketImplName] = options.webSocketImpl.split('#');
@@ -552,133 +568,146 @@ export class UrlLoader implements DocumentLoader {
return webSocketImplName ? (importedModule as Record)[webSocketImplName] : importedModule;
}
} else {
- const websocketImpl = options.webSocketImpl || WebSocket;
+ const websocketImpl = options?.webSocketImpl || WebSocket;
return websocketImpl;
}
}
- async getExecutorAsync(pointer: SchemaPointerSingle, options: LoadFromUrlOptions = {}): Promise {
- const fetch = await this.getFetch(options.customFetch, asyncImport, true);
- const defaultMethod = this.getDefaultMethodFromOptions(options.method, 'POST');
-
- const httpExecutor = this.buildExecutor({
- pointer,
- fetch,
- extraHeaders: options.headers,
- defaultMethod,
- useGETForQueries: options.useGETForQueries,
- multipart: options.multipart,
- });
-
- let subscriptionExecutor: AsyncExecutor;
-
- const subscriptionsEndpoint = options.subscriptionsEndpoint || pointer;
- if (options.useSSEForSubscription) {
- subscriptionExecutor = this.buildSSEExecutor(
- subscriptionsEndpoint,
- options.headers,
- fetch,
- options.eventSourceOptions
- );
+ async buildSubscriptionExecutor(
+ subscriptionsEndpoint: string,
+ fetch: AsyncFetchFn,
+ options?: Omit
+ ): Promise {
+ if (options?.subscriptionsProtocol === SubscriptionProtocol.SSE) {
+ return this.buildSSEExecutor(subscriptionsEndpoint, fetch, options);
} else {
- const webSocketImpl = await this.getWebSocketImpl(options, asyncImport);
- const connectionParams = () => ({ headers: options.headers });
- if (options.useWebSocketLegacyProtocol) {
- subscriptionExecutor = this.buildWSLegacyExecutor(subscriptionsEndpoint, webSocketImpl, connectionParams);
+ const webSocketImpl = await this.getWebSocketImpl(asyncImport, options);
+ const connectionParams = () => ({ headers: options?.headers });
+ if (options?.subscriptionsProtocol === SubscriptionProtocol.LEGACY_WS) {
+ return this.buildWSLegacyExecutor(subscriptionsEndpoint, webSocketImpl, connectionParams);
} else {
- subscriptionExecutor = this.buildWSExecutor(subscriptionsEndpoint, webSocketImpl, connectionParams);
+ return this.buildWSExecutor(subscriptionsEndpoint, webSocketImpl, connectionParams);
}
}
+ }
+
+ async getExecutorAsync(endpoint: string, options?: Omit): Promise {
+ const fetch = await this.getFetch(options?.customFetch, asyncImport);
+ const httpExecutor = this.buildHTTPExecutor(endpoint, fetch, options);
+ const subscriptionsEndpoint = options?.subscriptionsEndpoint || endpoint;
+ const subscriptionExecutor = await this.buildSubscriptionExecutor(subscriptionsEndpoint, fetch, options);
return params => {
- if (params.info?.operation.operation === 'subscription') {
+ const operationAst = getOperationAST(params.document, params.operationName);
+ if (!operationAst) {
+ throw new Error(`No valid operations found: ${params.operationName || ''}`);
+ }
+ if (
+ operationAst.operation === 'subscription' ||
+ isLiveQueryOperationDefinitionNode(operationAst, params.variables as Record)
+ ) {
return subscriptionExecutor(params);
}
return httpExecutor(params);
};
}
- getExecutorSync(pointer: SchemaPointerSingle, options: LoadFromUrlOptions): SyncExecutor {
- const fetch = this.getFetch(options?.customFetch, syncImport, false);
- const defaultMethod = this.getDefaultMethodFromOptions(options?.method, 'POST');
-
- const executor = this.buildExecutor({
- pointer,
- fetch,
- extraHeaders: options.headers,
- defaultMethod,
- useGETForQueries: options.useGETForQueries,
- });
+ getExecutorSync(endpoint: string, options: Omit): SyncExecutor {
+ const fetch = this.getFetch(options?.customFetch, syncImport);
+ const executor = this.buildHTTPExecutor(endpoint, fetch, options);
return executor;
}
- async getSubschemaConfigAsync(pointer: SchemaPointerSingle, options: LoadFromUrlOptions): Promise {
- const executor = await this.getExecutorAsync(pointer, options);
- return {
- schema: await introspectSchema(executor, undefined, options as IntrospectionOptions),
- executor,
- };
- }
-
- getSubschemaConfigSync(pointer: SchemaPointerSingle, options: LoadFromUrlOptions): SubschemaConfig {
- const executor = this.getExecutorSync(pointer, options);
- return {
- schema: introspectSchema(executor, undefined, options as IntrospectionOptions),
- executor,
- };
- }
-
- async handleSDLAsync(pointer: SchemaPointerSingle, options: LoadFromUrlOptions): Promise