-
Notifications
You must be signed in to change notification settings - Fork 257
/
RemoteGraphQLDataSource.ts
330 lines (292 loc) · 11.4 KB
/
RemoteGraphQLDataSource.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import { isObject } from '../utilities/predicates';
import { GraphQLDataSource, GraphQLDataSourceProcessOptions, GraphQLDataSourceRequestKind } from './types';
import { createHash } from '@apollo/utils.createhash';
import { ResponsePath } from '@apollo/query-planner';
import { parseCacheControlHeader } from './parseCacheControlHeader';
import fetcher from 'make-fetch-happen';
import { Headers as NodeFetchHeaders, Request as NodeFetchRequest } from 'node-fetch';
import { Fetcher, FetcherRequestInit, FetcherResponse } from '@apollo/utils.fetcher';
import { GraphQLError, GraphQLErrorExtensions } from 'graphql';
import { GatewayCacheHint, GatewayCachePolicy, GatewayGraphQLRequest, GatewayGraphQLRequestContext, GatewayGraphQLResponse } from '@apollo/server-gateway-interface';
export class RemoteGraphQLDataSource<
TContext extends Record<string, any> = Record<string, any>,
> implements GraphQLDataSource<TContext>
{
fetcher: Fetcher;
constructor(
config?: Partial<RemoteGraphQLDataSource<TContext>> &
object &
ThisType<RemoteGraphQLDataSource<TContext>>,
) {
this.fetcher = fetcher.defaults({
// Allow an arbitrary number of sockets per subgraph. This is the default
// behavior of Node's http.Agent as well as the npm package agentkeepalive
// which wraps it, but is not the default behavior of make-fetch-happen
// which wraps agentkeepalive (that package sets this to 15 by default).
maxSockets: Infinity,
// although this is the default, we want to take extra care and be very
// explicity to ensure that mutations cannot be retried. please leave this
// intact.
retry: false,
});
if (config) {
return Object.assign(this, config);
}
}
url!: string;
/**
* Whether the downstream request should be made with automated persisted
* query (APQ) behavior enabled.
*
* @remarks When enabled, the request to the downstream service will first be
* attempted using a SHA-256 hash of the operation rather than including the
* operation itself. If the downstream server supports APQ and has this
* operation registered in its APQ storage, it will be able to complete the
* request without the entirety of the operation document being transmitted.
*
* In the event that the downstream service is unaware of the operation, it
* will respond with an `PersistedQueryNotFound` error and it will be resent
* with the full operation body for fulfillment.
*
* Generally speaking, when the downstream server is processing similar
* operations repeatedly, APQ can offer substantial network savings in terms
* of bytes transmitted over the wire between gateways and downstream servers.
*/
apq: boolean = false;
/**
* Should cache-control response headers from subgraphs affect the operation's
* cache policy? If it shouldn't, set this to false.
*/
honorSubgraphCacheControlHeader: boolean = true;
async process(
options: GraphQLDataSourceProcessOptions<TContext>,
): Promise<GatewayGraphQLResponse> {
const { request, context: originalContext } = options;
const pathInIncomingRequest =
options.kind === GraphQLDataSourceRequestKind.INCOMING_OPERATION
? options.pathInIncomingRequest
: undefined;
// Deal with a bit of a hairy situation in typings: when doing health checks
// and schema checks we always pass in `{}` as the context even though it's
// not really guaranteed to be a `TContext`, and then we pass it to various
// methods on this object. The reason this "works" is that the DataSourceMap
// and Service types aren't generic-ized on TContext at all (so `{}` is in
// practice always legal there)... ie, the genericness of this class is
// questionable in the first place.
const context = originalContext as TContext;
// Respect incoming http headers (eg, apollo-federation-include-trace).
const headers = new NodeFetchHeaders();
if (request.http?.headers) {
for (const [name, value] of request.http.headers) {
headers.append(name, value);
}
}
headers.set('Content-Type', 'application/json');
request.http = {
method: 'POST',
url: this.url,
headers,
};
if (this.willSendRequest) {
await this.willSendRequest(options);
}
if (!request.query) {
throw new Error('Missing query');
}
const { query, ...requestWithoutQuery } = request;
// Special handling of cache-control headers in response. Requires
// Apollo Server 3, so we check to make sure the method we want is
// there.
const overallCachePolicy =
this.honorSubgraphCacheControlHeader &&
options.kind === GraphQLDataSourceRequestKind.INCOMING_OPERATION &&
options.incomingRequestContext.overallCachePolicy &&
'restrict' in options.incomingRequestContext.overallCachePolicy
? options.incomingRequestContext.overallCachePolicy
: null;
if (this.apq) {
const apqHash = createHash('sha256').update(request.query).digest('hex');
// Take the original extensions and extend them with
// the necessary "extensions" for APQ handshaking.
requestWithoutQuery.extensions = {
...request.extensions,
persistedQuery: {
version: 1,
sha256Hash: apqHash,
},
};
const apqOptimisticResponse = await this.sendRequest(
requestWithoutQuery,
context,
);
// If we didn't receive notice to retry with APQ, then let's
// assume this is the best result we'll get and return it!
if (
!apqOptimisticResponse.errors ||
!apqOptimisticResponse.errors.find(
(error) => error.message === 'PersistedQueryNotFound',
)
) {
return this.respond({
response: apqOptimisticResponse,
request: requestWithoutQuery,
context,
overallCachePolicy,
pathInIncomingRequest
});
}
}
// If APQ was enabled, we'll run the same request again, but add in the
// previously omitted `query`. If APQ was NOT enabled, this is the first
// request (non-APQ, all the way).
const requestWithQuery: GatewayGraphQLRequest = {
query,
...requestWithoutQuery,
};
const response = await this.sendRequest(requestWithQuery, context);
return this.respond({
response,
request: requestWithQuery,
context,
overallCachePolicy,
pathInIncomingRequest
});
}
private async sendRequest(
request: GatewayGraphQLRequest,
context: TContext,
): Promise<GatewayGraphQLResponse> {
// This would represent an internal programming error since this shouldn't
// be possible in the way that this method is invoked right now.
if (!request.http) {
throw new Error("Internal error: Only 'http' requests are supported.");
}
// We don't want to serialize the `http` properties into the body that is
// being transmitted. Instead, we want those to be used to indicate what
// we're accessing (e.g. url) and what we access it with (e.g. headers).
const { http, ...requestWithoutHttp } = request;
const stringifiedRequestWithoutHttp = JSON.stringify(requestWithoutHttp);
const requestInit: FetcherRequestInit = {
method: http.method,
headers: Object.fromEntries(http.headers),
body: stringifiedRequestWithoutHttp,
};
// Note that we don't actually send this Request object to the fetcher; it
// is merely sent to methods on this object that might be overridden by users.
// We are careful to only send data to the overridable fetcher function that uses
// plain JS objects --- some fetch implementations don't know how to handle
// Request or Headers objects created by other fetch implementations.
const fetchRequest = new NodeFetchRequest(http.url, requestInit);
let fetchResponse: FetcherResponse | undefined;
try {
// Use our local `fetcher` to allow for fetch injection
// Use the fetcher's `Request` implementation for compatibility
fetchResponse = await this.fetcher(http.url, requestInit);
if (!fetchResponse.ok) {
throw await this.errorFromResponse(fetchResponse);
}
const body = await this.parseBody(fetchResponse, fetchRequest, context);
if (!isObject(body)) {
throw new Error(`Expected JSON response body, but received: ${body}`);
}
return {
...body,
http: fetchResponse,
};
} catch (error) {
this.didEncounterError(error, fetchRequest, fetchResponse, context);
throw error;
}
}
public willSendRequest?(
options: GraphQLDataSourceProcessOptions<TContext>,
): void | Promise<void>;
private async respond({
response,
request,
context,
overallCachePolicy,
pathInIncomingRequest
}: {
response: GatewayGraphQLResponse;
request: GatewayGraphQLRequest;
context: TContext;
overallCachePolicy: GatewayCachePolicy | null;
pathInIncomingRequest?: ResponsePath
}): Promise<GatewayGraphQLResponse> {
const processedResponse =
typeof this.didReceiveResponse === 'function'
? await this.didReceiveResponse({ response, request, context, pathInIncomingRequest })
: response;
if (overallCachePolicy) {
const parsed = parseCacheControlHeader(
response.http?.headers.get('cache-control'),
);
// If the subgraph does not specify a max-age, we assume its response (and
// thus the overall response) is uncacheable. (If you don't like this, you
// can tweak the `cache-control` header in your `didReceiveResponse`
// method.)
const hint: GatewayCacheHint = { maxAge: 0 };
const maxAge = parsed['max-age'];
if (typeof maxAge === 'string' && maxAge.match(/^[0-9]+$/)) {
hint.maxAge = +maxAge;
}
if (parsed['private'] === true) {
hint.scope = 'PRIVATE';
}
if (parsed['public'] === true) {
hint.scope = 'PUBLIC';
}
overallCachePolicy.restrict(hint);
}
return processedResponse;
}
public didReceiveResponse?(
requestContext: Required<
Pick<GatewayGraphQLRequestContext<TContext>, 'request' | 'response' | 'context'>
> & { pathInIncomingRequest?: ResponsePath }
): GatewayGraphQLResponse | Promise<GatewayGraphQLResponse>;
public didEncounterError(
error: Error,
_fetchRequest: NodeFetchRequest,
_fetchResponse?: FetcherResponse,
_context?: TContext,
) {
throw error;
}
public parseBody(
fetchResponse: FetcherResponse,
_fetchRequest?: NodeFetchRequest,
_context?: TContext,
): Promise<object | string> {
const contentType = fetchResponse.headers.get('Content-Type');
if (
contentType &&
(contentType.startsWith('application/json') ||
contentType.startsWith('application/graphql-response+json'))
) {
return fetchResponse.json();
} else {
return fetchResponse.text();
}
}
public async errorFromResponse(response: FetcherResponse) {
const body = await this.parseBody(response);
const extensions: GraphQLErrorExtensions = {
response: {
url: response.url,
status: response.status,
statusText: response.statusText,
body,
},
};
if (response.status === 401) {
extensions.code = 'UNAUTHENTICATED';
} else if (response.status === 403) {
extensions.code = 'FORBIDDEN';
}
return new GraphQLError(`${response.status}: ${response.statusText}`, {
extensions,
});
}
}