From 9e58a630fcafcb538710fafb0cbb04c8903bc638 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Tue, 22 Aug 2023 16:23:25 +0200 Subject: [PATCH 01/11] Revert "Revert "Supergraph coprocessor implementation (#3408)" (#3604)" This reverts commit b1a2310e008214561117adc5d2c5a5b4db3ba608. --- CHANGELOG.md | 33 +- ...nfiguration__tests__schema_generation.snap | 107 +++ .../{coprocessor.rs => coprocessor/mod.rs} | 40 +- .../src/plugins/coprocessor/supergraph.rs | 881 ++++++++++++++++++ .../test.rs} | 2 +- apollo-router/src/plugins/mod.rs | 2 - apollo-router/src/services/external.rs | 36 + docs/source/customizations/coprocessor.mdx | 25 +- 8 files changed, 1105 insertions(+), 21 deletions(-) rename apollo-router/src/plugins/{coprocessor.rs => coprocessor/mod.rs} (97%) create mode 100644 apollo-router/src/plugins/coprocessor/supergraph.rs rename apollo-router/src/plugins/{coprocessor_test.rs => coprocessor/test.rs} (99%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24b5b86bb8..7751be5d97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,27 @@ Note that the name of this metric may change in the future. By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3513 +### Supergraph coprocessor implementation ([PR #3408](https://github.com/apollographql/router/pull/3408)) + +Coprocessors now support supergraph service interception. + +Supergraph request contains: +* query +* operation name +* variables +* extensions + +Supergraph reqponse contains: +* label +* data +* path +* errors +* extensions + +When using `@defer` or subscriptions a supergraph response may contain multiple GraphQL responses, and the coprocessor will be called for each. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3408 + ### Configure AWS SigV4 authentication for subgraph requests ([PR #3365](https://github.com/apollographql/router/pull/3365)) Secure your router to subgraph communication on AWS using [Signature Version 4](https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html) (Sigv4)! @@ -87,7 +108,7 @@ By [@garypen](https://github.com/garypen) in https://github.com/apollographql/ro ### Spelling of `content_negociation` corrected to `content_negotiation` ([Issue #3204](https://github.com/apollographql/router/issues/3204)) -We had a bit of a French twist on one of our internal module names. We won't promise it won't happen again, but `content_negociation` is spelled as `content_negotiation` now. 😄 +We had a bit of a French twist on one of our internal module names. We won't promise it won't happen again, but `content_negociation` is spelled as `content_negotiation` now. 😄 Thank you for this contribution! @@ -1443,7 +1464,7 @@ By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/p ### Small gzip'd responses no longer cause a panic -A regression introduced in v1.17.0 — again related to compression — has been resolved. This occurred when small responses used invalid buffer management, causing a panic. +A regression introduced in v1.17.0 — again related to compression — has been resolved. This occurred when small responses used invalid buffer management, causing a panic. By [@dbanty](https://github.com/dbanty) in https://github.com/apollographql/router/pull/3047 @@ -1977,7 +1998,7 @@ By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographq ### Distributed caching: Don't send Redis' `CLIENT SETNAME` ([PR #2825](https://github.com/apollographql/router/pull/2825)) -We won't send [the `CLIENT SETNAME` command](https://redis.io/commands/client-setname/) to connected Redis servers. This resolves an incompatibility with some Redis-compatible servers since not all "Redis-compatible" offerings (like Google Memorystore) actually support _every_ Redis command. We weren't actually necessitating this feature, it was just a feature that could be enabled optionally on our Redis client. No Router functionality is impacted. +We won't send [the `CLIENT SETNAME` command](https://redis.io/commands/client-setname/) to connected Redis servers. This resolves an incompatibility with some Redis-compatible servers since not all "Redis-compatible" offerings (like Google Memorystore) actually support _every_ Redis command. We weren't actually necessitating this feature, it was just a feature that could be enabled optionally on our Redis client. No Router functionality is impacted. By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2825 @@ -3147,7 +3168,7 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p ### Optimize header propagation plugin's regular expression matching ([PR #2392](https://github.com/apollographql/router/pull/2392)) -We've changed the header propagation plugins' behavior to reduce the chance of memory allocations occurring when applying regex-based header propagation rules. +We've changed the header propagation plugins' behavior to reduce the chance of memory allocations occurring when applying regex-based header propagation rules. By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/2392 @@ -3197,7 +3218,7 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p Configuration changes will be [automatically migrated on load](https://www.apollographql.com/docs/router/configuration/overview#upgrading-your-router-configuration). However, you should update your source configuration files as these will become breaking changes in a future major release. -### Defer support graduates from preview ([Issue #2368](https://github.com/apollographql/router/issues/2368)) +### Defer support graduates from preview ([Issue #2368](https://github.com/apollographql/router/issues/2368)) We're pleased to announce that [`@defer` support](https://www.apollographql.com/docs/router/executing-operations/defer-support/) has been promoted to general availability in accordance with our [product launch stages](https://www.apollographql.com/docs/resources/product-launch-stages/). @@ -9136,4 +9157,4 @@ See our [release stages] for more information. But the lack of clarity goes back to not having kept track of everything thus far! We can _fix_ our processes to keep track of these things! :smile_cat: -# [0.1.0] - TBA \ No newline at end of file +# [0.1.0] - TBA diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 5d9bab1612..3cef4be4d2 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -770,6 +770,113 @@ expression: "&schema" }, "additionalProperties": false }, + "supergraph": { + "description": "The supergraph stage request/response configuration", + "default": { + "request": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "path": false, + "method": false + }, + "response": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "status_code": false + } + }, + "type": "object", + "properties": { + "request": { + "description": "The request configuration", + "default": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "path": false, + "method": false + }, + "type": "object", + "properties": { + "body": { + "description": "Send the body", + "default": false, + "type": "boolean" + }, + "context": { + "description": "Send the context", + "default": false, + "type": "boolean" + }, + "headers": { + "description": "Send the headers", + "default": false, + "type": "boolean" + }, + "method": { + "description": "Send the method", + "default": false, + "type": "boolean" + }, + "path": { + "description": "Send the path", + "default": false, + "type": "boolean" + }, + "sdl": { + "description": "Send the SDL", + "default": false, + "type": "boolean" + } + }, + "additionalProperties": false + }, + "response": { + "description": "What information is passed to a router request/response stage", + "default": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "status_code": false + }, + "type": "object", + "properties": { + "body": { + "description": "Send the body", + "default": false, + "type": "boolean" + }, + "context": { + "description": "Send the context", + "default": false, + "type": "boolean" + }, + "headers": { + "description": "Send the headers", + "default": false, + "type": "boolean" + }, + "sdl": { + "description": "Send the SDL", + "default": false, + "type": "boolean" + }, + "status_code": { + "description": "Send the HTTP status", + "default": false, + "type": "boolean" + } + }, + "additionalProperties": false + } + } + }, "timeout": { "description": "The timeout for external requests", "default": { diff --git a/apollo-router/src/plugins/coprocessor.rs b/apollo-router/src/plugins/coprocessor/mod.rs similarity index 97% rename from apollo-router/src/plugins/coprocessor.rs rename to apollo-router/src/plugins/coprocessor/mod.rs index fdad623cf8..356f42b0e2 100644 --- a/apollo-router/src/plugins/coprocessor.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -35,6 +35,7 @@ use crate::layers::ServiceBuilderExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; use crate::register_plugin; +use crate::services; use crate::services::external::Control; use crate::services::external::Externalizable; use crate::services::external::PipelineStep; @@ -44,6 +45,11 @@ use crate::services::router; use crate::services::subgraph; use crate::tracer::TraceId; +#[cfg(test)] +mod test; + +mod supergraph; + pub(crate) const EXTERNAL_SPAN_NAME: &str = "external_plugin"; const POOL_IDLE_TIMEOUT_DURATION: Option = Some(Duration::from_secs(5)); @@ -86,6 +92,13 @@ impl Plugin for CoprocessorPlugin { self.router_service(service) } + fn supergraph_service( + &self, + service: services::supergraph::BoxService, + ) -> services::supergraph::BoxService { + self.supergraph_service(service) + } + fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService { self.subgraph_service(name, service) } @@ -149,6 +162,18 @@ where ) } + fn supergraph_service( + &self, + service: services::supergraph::BoxService, + ) -> services::supergraph::BoxService { + self.configuration.supergraph.as_service( + self.http_client.clone(), + service, + self.configuration.url.clone(), + self.sdl.clone(), + ) + } + fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService { self.configuration.subgraph.all.as_service( self.http_client.clone(), @@ -239,6 +264,9 @@ struct Conf { /// The router stage request/response configuration #[serde(default)] router: RouterStage, + /// The supergraph stage request/response configuration + #[serde(default)] + supergraph: supergraph::SupergraphStage, /// The subgraph stage request/response configuration #[serde(default)] subgraph: SubgraphStages, @@ -676,7 +704,7 @@ where // If first is None, or contains an error we return an error let opt_first: Option = first.and_then(|f| f.ok()); let bytes = match opt_first { - Some(b) => b.to_vec(), + Some(b) => b, None => { tracing::error!( "Coprocessor cannot convert body into future due to problem with first part" @@ -695,7 +723,7 @@ where .transpose()?; let body_to_send = response_config .body - .then(|| String::from_utf8(bytes.clone())) + .then(|| std::str::from_utf8(&bytes).map(|s| s.to_string())) .transpose()?; let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.then(|| response.context.clone()); @@ -857,7 +885,6 @@ where // First, extract the data we need from our request and prepare our // external call. Use our configuration to figure out which data to send. let (parts, body) = request.subgraph_request.into_parts(); - let bytes = Bytes::from(serde_json::to_vec(&body)?); let headers_to_send = request_config .headers @@ -866,7 +893,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::to_value(&body)) .transpose()?; let context_to_send = request_config.context.then(|| request.context.clone()); let uri = request_config.uri.then(|| parts.uri.to_string()); @@ -997,7 +1024,6 @@ where // external call. Use our configuration to figure out which data to send. let (parts, body) = response.response.into_parts(); - let bytes = Bytes::from(serde_json::to_vec(&body)?); let headers_to_send = response_config .headers @@ -1008,7 +1034,7 @@ where let body_to_send = response_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::to_value(&body)) .transpose()?; let context_to_send = response_config.context.then(|| response.context.clone()); let service_name = response_config.service_name.then_some(service_name); @@ -1098,7 +1124,7 @@ fn validate_coprocessor_output( } /// Convert a HeaderMap into a HashMap -pub(super) fn externalize_header_map( +pub(crate) fn externalize_header_map( input: &HeaderMap, ) -> Result>, BoxError> { let mut output = HashMap::new(); diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs new file mode 100644 index 0000000000..f6d0a06e18 --- /dev/null +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -0,0 +1,881 @@ +use std::ops::ControlFlow; +use std::sync::Arc; + +use futures::future; +use futures::stream; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use tower::BoxError; +use tower::ServiceBuilder; +use tower_service::Service; + +use super::externalize_header_map; +use super::*; +use crate::graphql; +use crate::layers::async_checkpoint::AsyncCheckpointLayer; +use crate::layers::ServiceBuilderExt; +use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; +use crate::response; +use crate::services::supergraph; + +/// What information is passed to a router request/response stage +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(default, deny_unknown_fields)] +pub(super) struct SupergraphRequestConf { + /// Send the headers + pub(super) headers: bool, + /// Send the context + pub(super) context: bool, + /// Send the body + pub(super) body: bool, + /// Send the SDL + pub(super) sdl: bool, + /// Send the path + pub(super) path: bool, + /// Send the method + pub(super) method: bool, +} + +/// What information is passed to a router request/response stage +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(default, deny_unknown_fields)] +pub(super) struct SupergraphResponseConf { + /// Send the headers + pub(super) headers: bool, + /// Send the context + pub(super) context: bool, + /// Send the body + pub(super) body: bool, + /// Send the SDL + pub(super) sdl: bool, + /// Send the HTTP status + pub(super) status_code: bool, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(default)] +pub(super) struct SupergraphStage { + /// The request configuration + pub(super) request: SupergraphRequestConf, + // /// The response configuration + pub(super) response: SupergraphResponseConf, +} + +impl SupergraphStage { + pub(crate) fn as_service( + &self, + http_client: C, + service: supergraph::BoxService, + coprocessor_url: String, + sdl: Arc, + ) -> supergraph::BoxService + where + C: Service, Response = hyper::Response, Error = BoxError> + + Clone + + Send + + Sync + + 'static, + >>::Future: Send + 'static, + { + let request_layer = (self.request != Default::default()).then_some({ + let request_config = self.request.clone(); + let coprocessor_url = coprocessor_url.clone(); + let http_client = http_client.clone(); + let sdl = sdl.clone(); + + AsyncCheckpointLayer::new(move |request: supergraph::Request| { + let request_config = request_config.clone(); + let coprocessor_url = coprocessor_url.clone(); + let http_client = http_client.clone(); + let sdl = sdl.clone(); + + async move { + process_supergraph_request_stage( + http_client, + coprocessor_url, + sdl, + request, + request_config, + ) + .await + .map_err(|error| { + tracing::error!( + "external extensibility: supergraph request stage error: {error}" + ); + error + }) + } + }) + }); + + let response_layer = (self.response != Default::default()).then_some({ + let response_config = self.response.clone(); + + MapFutureLayer::new(move |fut| { + let coprocessor_url = coprocessor_url.clone(); + let sdl: Arc = sdl.clone(); + let http_client = http_client.clone(); + let response_config = response_config.clone(); + + async move { + let response: supergraph::Response = fut.await?; + + process_supergraph_response_stage( + http_client, + coprocessor_url, + sdl, + response, + response_config, + ) + .await + .map_err(|error| { + tracing::error!( + "external extensibility: router response stage error: {error}" + ); + error + }) + } + }) + }); + + fn external_service_span() -> impl Fn(&supergraph::Request) -> tracing::Span + Clone { + move |_request: &supergraph::Request| { + tracing::info_span!( + EXTERNAL_SPAN_NAME, + "external service" = stringify!(supergraph::Request), + "otel.kind" = "INTERNAL" + ) + } + } + + ServiceBuilder::new() + .instrument(external_service_span()) + .option_layer(request_layer) + .option_layer(response_layer) + .buffered() + .service(service) + .boxed() + } +} + +async fn process_supergraph_request_stage( + http_client: C, + coprocessor_url: String, + sdl: Arc, + mut request: supergraph::Request, + request_config: SupergraphRequestConf, +) -> Result, BoxError> +where + C: Service, Response = hyper::Response, Error = BoxError> + + Clone + + Send + + Sync + + 'static, + >>::Future: Send + 'static, +{ + // Call into our out of process processor with a body of our body + // First, extract the data we need from our request and prepare our + // external call. Use our configuration to figure out which data to send. + let (parts, body) = request.supergraph_request.into_parts(); + let bytes = Bytes::from(serde_json::to_vec(&body)?); + + let headers_to_send = request_config + .headers + .then(|| externalize_header_map(&parts.headers)) + .transpose()?; + + let body_to_send = request_config + .body + .then(|| serde_json::from_slice::(&bytes)) + .transpose()?; + let context_to_send = request_config.context.then(|| request.context.clone()); + + let payload = Externalizable::supergraph_builder() + .stage(PipelineStep::SupergraphRequest) + .control(Control::default()) + .and_id(TraceId::maybe_new().map(|id| id.to_string())) + .and_headers(headers_to_send) + .and_body(body_to_send) + .and_context(context_to_send) + .method(parts.method.to_string()) + .sdl(sdl.to_string()) + .build(); + + tracing::debug!(?payload, "externalized output"); + let guard = request.context.enter_active_request(); + let start = Instant::now(); + let co_processor_result = payload.call(http_client, &coprocessor_url).await; + let duration = start.elapsed().as_secs_f64(); + drop(guard); + tracing::info!( + histogram.apollo.router.operations.coprocessor.duration = duration, + coprocessor.stage = %PipelineStep::SupergraphRequest, + ); + + tracing::debug!(?co_processor_result, "co-processor returned"); + let co_processor_output = co_processor_result?; + validate_coprocessor_output(&co_processor_output, PipelineStep::SupergraphRequest)?; + // unwrap is safe here because validate_coprocessor_output made sure control is available + let control = co_processor_output.control.expect("validated above; qed"); + + // Thirdly, we need to interpret the control flow which may have been + // updated by our co-processor and decide if we should proceed or stop. + + if matches!(control, Control::Break(_)) { + // Ensure the code is a valid http status code + let code = control.get_http_status()?; + + let res = { + let graphql_response: crate::graphql::Response = + serde_json::from_value(co_processor_output.body.unwrap_or(serde_json::Value::Null)) + .unwrap_or_else(|error| { + crate::graphql::Response::builder() + .errors(vec![Error::builder() + .message(format!( + "couldn't deserialize coprocessor output body: {error}" + )) + .extension_code("EXTERNAL_DESERIALIZATION_ERROR") + .build()]) + .build() + }); + + let mut http_response = http::Response::builder() + .status(code) + .body(stream::once(future::ready(graphql_response)).boxed())?; + if let Some(headers) = co_processor_output.headers { + *http_response.headers_mut() = internalize_header_map(headers)?; + } + + let supergraph_response = supergraph::Response { + response: http_response, + context: request.context, + }; + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + supergraph_response + .context + .upsert_json_value(key, move |_current| value); + } + } + + supergraph_response + }; + return Ok(ControlFlow::Break(res)); + } + + // Finally, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming request with the updated bits if they + // are present in our co_processor_output. + + let new_body: crate::graphql::Request = match co_processor_output.body { + Some(value) => serde_json::from_value(value)?, + None => body, + }; + + request.supergraph_request = http::Request::from_parts(parts, new_body); + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + request + .context + .upsert_json_value(key, move |_current| value); + } + } + + if let Some(headers) = co_processor_output.headers { + *request.supergraph_request.headers_mut() = internalize_header_map(headers)?; + } + + if let Some(uri) = co_processor_output.uri { + *request.supergraph_request.uri_mut() = uri.parse()?; + } + + Ok(ControlFlow::Continue(request)) +} + +async fn process_supergraph_response_stage( + http_client: C, + coprocessor_url: String, + sdl: Arc, + response: supergraph::Response, + response_config: SupergraphResponseConf, +) -> Result +where + C: Service, Response = hyper::Response, Error = BoxError> + + Clone + + Send + + Sync + + 'static, + >>::Future: Send + 'static, +{ + // split the response into parts + body + let (mut parts, body) = response.response.into_parts(); + + // we split the body (which is a stream) into first response + rest of responses, + // for which we will implement mapping later + let (first, rest): (Option, graphql::ResponseStream) = + body.into_future().await; + + // If first is None, we return an error + let first = first.ok_or_else(|| { + BoxError::from("Coprocessor cannot convert body into future due to problem with first part") + })?; + + // Now we process our first chunk of response + // Encode headers, body, status, context, sdl to create a payload + let headers_to_send = response_config + .headers + .then(|| externalize_header_map(&parts.headers)) + .transpose()?; + let body_to_send = response_config + .body + .then(|| serde_json::to_value(&first).expect("serialization will not fail")); + let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); + let context_to_send = response_config.context.then(|| response.context.clone()); + let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); + + let payload = Externalizable::supergraph_builder() + .stage(PipelineStep::SupergraphResponse) + .and_id(TraceId::maybe_new().map(|id| id.to_string())) + .and_headers(headers_to_send) + .and_body(body_to_send) + .and_context(context_to_send) + .and_status_code(status_to_send) + .and_sdl(sdl_to_send.clone()) + .build(); + + // Second, call our co-processor and get a reply. + tracing::debug!(?payload, "externalized output"); + let guard = response.context.enter_active_request(); + let start = Instant::now(); + let co_processor_result = payload.call(http_client.clone(), &coprocessor_url).await; + let duration = start.elapsed().as_secs_f64(); + drop(guard); + tracing::info!( + histogram.apollo.router.operations.coprocessor.duration = duration, + coprocessor.stage = %PipelineStep::SupergraphResponse, + ); + + tracing::debug!(?co_processor_result, "co-processor returned"); + let co_processor_output = co_processor_result?; + + validate_coprocessor_output(&co_processor_output, PipelineStep::SupergraphResponse)?; + + // Third, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming response with the updated bits if they + // are present in our co_processor_output. If they aren't present, just use the + // bits that we sent to the co_processor. + let new_body: crate::response::Response = match co_processor_output.body { + Some(value) => serde_json::from_value(value)?, + None => first, + }; + + if let Some(control) = co_processor_output.control { + parts.status = control.get_http_status()? + } + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + response + .context + .upsert_json_value(key, move |_current| value); + } + } + + if let Some(headers) = co_processor_output.headers { + parts.headers = internalize_header_map(headers)?; + } + + // Clone all the bits we need + let context = response.context.clone(); + let map_context = response.context.clone(); + + // Map the rest of our body to process subsequent chunks of response + let mapped_stream = rest + .then(move |deferred_response| { + let generator_client = http_client.clone(); + let generator_coprocessor_url = coprocessor_url.clone(); + let generator_map_context = map_context.clone(); + let generator_sdl_to_send = sdl_to_send.clone(); + + async move { + let body_to_send = response_config.body.then(|| { + serde_json::to_value(&deferred_response).expect("serialization will not fail") + }); + let context_to_send = response_config + .context + .then(|| generator_map_context.clone()); + + // Note: We deliberately DO NOT send headers or status_code even if the user has + // requested them. That's because they are meaningless on a deferred response and + // providing them will be a source of confusion. + let payload = Externalizable::router_builder() + .stage(PipelineStep::SupergraphResponse) + .and_id(TraceId::maybe_new().map(|id| id.to_string())) + .and_body(body_to_send) + .and_context(context_to_send) + .and_sdl(generator_sdl_to_send) + .build(); + + // Second, call our co-processor and get a reply. + tracing::debug!(?payload, "externalized output"); + let guard = generator_map_context.enter_active_request(); + let co_processor_result = payload + .call(generator_client, &generator_coprocessor_url) + .await; + drop(guard); + tracing::debug!(?co_processor_result, "co-processor returned"); + let co_processor_output = co_processor_result?; + + validate_coprocessor_output( + &co_processor_output, + PipelineStep::SupergraphResponse, + )?; + + // Third, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming response with the updated bits if they + // are present in our co_processor_output. If they aren't present, just use the + // bits that we sent to the co_processor. + let new_deferred_response: crate::response::Response = + match co_processor_output.body { + Some(value) => serde_json::from_value(value)?, + None => deferred_response, + }; + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + generator_map_context.upsert_json_value(key, move |_current| value); + } + } + + // We return the deferred_response into our stream of response chunks + Ok(new_deferred_response) + } + }) + .map(|res: Result| match res { + Ok(response) => response, + Err(e) => { + tracing::error!("coprocessor error handling deferred supergraph response: {e}"); + response::Response::builder() + .error( + Error::builder() + .message("Internal error handling deferred response") + .extension_code("INTERNAL_ERROR") + .build(), + ) + .build() + } + }); + + // Create our response stream which consists of our first body chained with the + // rest of the responses in our mapped stream. + let stream = once(ready(new_body)).chain(mapped_stream).boxed(); + + // Finally, return a response which has a Body that wraps our stream of response chunks. + Ok(supergraph::Response { + context, + response: http::Response::from_parts(parts, stream), + }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::future::BoxFuture; + use http::StatusCode; + use hyper::Body; + use serde_json::json; + use tower::BoxError; + use tower::ServiceExt; + + use super::super::*; + use super::*; + use crate::plugin::test::MockHttpClientService; + use crate::plugin::test::MockSupergraphService; + use crate::services::supergraph; + + #[allow(clippy::type_complexity)] + pub(crate) fn mock_with_callback( + callback: fn( + hyper::Request, + ) -> BoxFuture<'static, Result, BoxError>>, + ) -> MockHttpClientService { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_call().returning(callback); + mock_http_client + }); + mock_http_client + }); + + mock_http_client + } + + #[allow(clippy::type_complexity)] + fn mock_with_deferred_callback( + callback: fn( + hyper::Request, + ) -> BoxFuture<'static, Result, BoxError>>, + ) -> MockHttpClientService { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_call().returning(callback); + mock_http_client + }); + mock_http_client + }); + mock_http_client + }); + + mock_http_client + } + + #[tokio::test] + async fn external_plugin_supergraph_request() { + let supergraph_stage = SupergraphStage { + request: SupergraphRequestConf { + headers: false, + context: false, + body: true, + sdl: false, + method: false, + path: false, + }, + response: Default::default(), + }; + + // This will never be called because we will fail at the coprocessor. + let mut mock_supergraph_service = MockSupergraphService::new(); + + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + // Let's assert that the subgraph request has been transformed as it should have. + assert_eq!( + req.supergraph_request.headers().get("cookie").unwrap(), + "tasty_cookie=strawberry" + ); + + assert_eq!( + req.context + .get::<&str, u8>("this-is-a-test-context") + .unwrap() + .unwrap(), + 42 + ); + + // The subgraph uri should have changed + assert_eq!( + Some("MyQuery"), + req.supergraph_request.body().operation_name.as_deref() + ); + + // The query should have changed + assert_eq!( + "query Long {\n me {\n name\n}\n}", + req.supergraph_request.body().query.as_ref().unwrap() + ); + + Ok(supergraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .errors(Vec::new()) + .extensions(crate::json_ext::Object::new()) + .context(req.context) + .build() + .unwrap()) + }); + + let mock_http_client = mock_with_callback(move |_: hyper::Request| { + Box::pin(async { + Ok(hyper::Response::builder() + .body(Body::from( + r##"{ + "version": 1, + "stage": "SupergraphRequest", + "control": "continue", + "headers": { + "cookie": [ + "tasty_cookie=strawberry" + ], + "content-type": [ + "application/json" + ], + "host": [ + "127.0.0.1:4000" + ], + "apollo-federation-include-trace": [ + "ftv1" + ], + "apollographql-client-name": [ + "manual" + ], + "accept": [ + "*/*" + ], + "user-agent": [ + "curl/7.79.1" + ], + "content-length": [ + "46" + ] + }, + "body": { + "query": "query Long {\n me {\n name\n}\n}", + "operationName": "MyQuery" + }, + "context": { + "entries": { + "accepts-json": false, + "accepts-wildcard": true, + "accepts-multipart": false, + "this-is-a-test-context": 42 + } + }, + "serviceName": "service name shouldn't change", + "uri": "http://thisurihaschanged" + }"##, + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::fake_builder().build().unwrap(); + + assert_eq!( + serde_json_bytes::json!({ "test": 1234_u32 }), + service + .oneshot(request) + .await + .unwrap() + .response + .into_body() + .next() + .await + .unwrap() + .data + .unwrap() + ); + } + + #[tokio::test] + async fn external_plugin_supergraph_request_controlflow_break() { + let supergraph_stage = SupergraphStage { + request: SupergraphRequestConf { + headers: false, + context: false, + body: true, + sdl: false, + method: false, + path: false, + }, + response: Default::default(), + }; + + // This will never be called because we will fail at the coprocessor. + let mock_supergraph_service = MockSupergraphService::new(); + + let mock_http_client = mock_with_callback(move |_: hyper::Request| { + Box::pin(async { + Ok(hyper::Response::builder() + .body(Body::from( + r##"{ + "version": 1, + "stage": "SupergraphRequest", + "control": { + "break": 200 + }, + "body": { + "errors": [{ "message": "my error message" }] + }, + "context": { + "entries": { + "testKey": true + } + }, + "headers": { + "aheader": ["a value"] + } + }"##, + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::fake_builder().build().unwrap(); + + let crate::services::supergraph::Response { + mut response, + context, + } = service.oneshot(request).await.unwrap(); + + assert!(context.get::<_, bool>("testKey").unwrap().unwrap()); + + let value = response.headers().get("aheader").unwrap(); + + assert_eq!("a value", value); + + assert_eq!( + "my error message", + response.body_mut().next().await.unwrap().errors[0] + .message + .as_str() + ); + } + + #[tokio::test] + async fn external_plugin_supergraph_response() { + let supergraph_stage = SupergraphStage { + response: SupergraphResponseConf { + headers: true, + context: true, + body: true, + sdl: true, + status_code: false, + }, + request: Default::default(), + }; + + let mut mock_supergraph_service = MockSupergraphService::new(); + + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + Ok(supergraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .errors(Vec::new()) + .extensions(crate::json_ext::Object::new()) + .context(req.context) + .build() + .unwrap()) + }); + + let mock_http_client = mock_with_deferred_callback(move |res: hyper::Request| { + Box::pin(async { + let deserialized_response: Externalizable = + serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await.unwrap()) + .unwrap(); + + assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); + assert_eq!( + PipelineStep::SupergraphResponse.to_string(), + deserialized_response.stage + ); + + assert_eq!( + json! {{"data":{ "test": 1234_u32 }}}, + deserialized_response.body.unwrap() + ); + + let input = json!( + { + "version": 1, + "stage": "SupergraphResponse", + "control": { + "break": 400 + }, + "id": "1b19c05fdafc521016df33148ad63c1b", + "headers": { + "cookie": [ + "tasty_cookie=strawberry" + ], + "content-type": [ + "application/json" + ], + "host": [ + "127.0.0.1:4000" + ], + "apollo-federation-include-trace": [ + "ftv1" + ], + "apollographql-client-name": [ + "manual" + ], + "accept": [ + "*/*" + ], + "user-agent": [ + "curl/7.79.1" + ], + "content-length": [ + "46" + ] + }, + "body": { + "data": { "test": 42 } + }, + "context": { + "entries": { + "accepts-json": false, + "accepts-wildcard": true, + "accepts-multipart": false, + "this-is-a-test-context": 42 + } + }, + "sdl": "the sdl shouldnt change" + }); + Ok(hyper::Response::builder() + .body(Body::from(serde_json::to_string(&input).unwrap())) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::canned_builder().build().unwrap(); + + let mut res = service.oneshot(request).await.unwrap(); + + // Let's assert that the router request has been transformed as it should have. + assert_eq!(res.response.status(), StatusCode::BAD_REQUEST); + assert_eq!( + res.response.headers().get("cookie").unwrap(), + "tasty_cookie=strawberry" + ); + + assert_eq!( + res.context + .get::<&str, u8>("this-is-a-test-context") + .unwrap() + .unwrap(), + 42 + ); + + let body = res.response.body_mut().next().await.unwrap(); + // the body should have changed: + assert_eq!( + json!({ "data": { "test": 42_u32 } }), + serde_json::to_value(&body).unwrap(), + ); + } +} diff --git a/apollo-router/src/plugins/coprocessor_test.rs b/apollo-router/src/plugins/coprocessor/test.rs similarity index 99% rename from apollo-router/src/plugins/coprocessor_test.rs rename to apollo-router/src/plugins/coprocessor/test.rs index 297391af5e..05ee1650d7 100644 --- a/apollo-router/src/plugins/coprocessor_test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -17,7 +17,7 @@ mod tests { use tower::BoxError; use tower::ServiceExt; - use super::super::coprocessor::*; + use super::super::*; use crate::plugin::test::MockHttpClientService; use crate::plugin::test::MockRouterService; use crate::plugin::test::MockSubgraphService; diff --git a/apollo-router/src/plugins/mod.rs b/apollo-router/src/plugins/mod.rs index 2bd2270df1..c3c4ccc436 100644 --- a/apollo-router/src/plugins/mod.rs +++ b/apollo-router/src/plugins/mod.rs @@ -23,8 +23,6 @@ macro_rules! schemar_fn { pub(crate) mod authentication; mod authorization; mod coprocessor; -#[cfg(test)] -mod coprocessor_test; pub(crate) mod csrf; mod expose_query_plan; mod forbid_mutations; diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs index 440f399d1d..2161ac053d 100644 --- a/apollo-router/src/services/external.rs +++ b/apollo-router/src/services/external.rs @@ -131,6 +131,42 @@ where } } + #[builder(visibility = "pub(crate)")] + /// This is the constructor (or builder) to use when constructing a Supergraph + /// `Externalizable`. + /// + fn supergraph_new( + stage: PipelineStep, + control: Option, + id: Option, + headers: Option>>, + body: Option, + context: Option, + status_code: Option, + method: Option, + sdl: Option, + ) -> Self { + assert!(matches!( + stage, + PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse + )); + Externalizable { + version: EXTERNALIZABLE_VERSION, + stage: stage.to_string(), + control, + id, + headers, + body, + context, + status_code, + sdl, + uri: None, + path: None, + method, + service_name: None, + } + } + #[builder(visibility = "pub(crate)")] /// This is the constructor (or builder) to use when constructing a Subgraph /// `Externalizable`. diff --git a/docs/source/customizations/coprocessor.mdx b/docs/source/customizations/coprocessor.mdx index 5842baf4c5..b09bae0974 100644 --- a/docs/source/customizations/coprocessor.mdx +++ b/docs/source/customizations/coprocessor.mdx @@ -37,17 +37,18 @@ flowchart TB; client --"1. Sends request"--> routerService; routerService -."2. Can send request
details to coprocessor
and receive modifications".-> coprocessing; routerService --"3"--> supergraphService; - supergraphService --"4"--> executionService; - executionService --"5"--> subgraphService; - subgraphService -."6. Can send request
details to coprocessor
and receive modifications".-> coprocessing; - subgraphService -- "7"--> subgraphs; + supergraphService --"4. Can send request
details to coprocessor
and receive modifications".-> coprocessing; + supergraphService --"5"--> executionService; + executionService --"6"--> subgraphService; + subgraphService -."7. Can send request
details to coprocessor
and receive modifications".-> coprocessing; + subgraphService -- "8"--> subgraphs; class client,subgraphs,coprocessing secondary; ``` > This diagram shows request execution proceeding "down" from a client, through the router, to individual subgraphs. Execution then proceeds back "up" to the client in the reverse order. -As shown in the diagram above, only the `RouterService` and `SubgraphService` of the [request-handling lifecycle](./rhai/#router-request-lifecycle) can send these POST requests (also called **coprocessor requests**). +As shown in the diagram above, the `RouterService`, `SupergraphService` and `SubgraphService` steps of the [request-handling lifecycle](./rhai/#router-request-lifecycle) can send these POST requests (also called **coprocessor requests**). Each supported service can send its coprocessor requests at two different **stages**: @@ -91,6 +92,20 @@ coprocessor: context: false sdl: false status_code: false + supergraph: # This coprocessor hooks into the `SupergraphService` + request: # By including this key, the `SupergraphService` sends a coprocessor request whenever it first receives a client request. + headers: true # These boolean properties indicate which request data to include in the coprocessor request. All are optional and false by default. + body: false + context: false + sdl: false + path: false + method: false + response: # By including this key, the `SupergraphService` sends a coprocessor request whenever it's about to send response data to a client (including incremental data via @defer). + headers: true + body: false + context: false + sdl: false + status_code: false subgraph: all: request: # By including this key, the `SubgraphService` sends a coprocessor request whenever it is about to make a request to a subgraph. From 1aba167847249049401676122f5c67819792af26 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Fri, 18 Aug 2023 13:18:54 +0200 Subject: [PATCH 02/11] supergraph coprocessor followup, with docs update and changelog update --- CHANGELOG.md | 2 +- apollo-router/src/plugins/coprocessor/supergraph.rs | 8 ++++---- docs/source/customizations/coprocessor.mdx | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7751be5d97..152eadfd61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,7 +38,7 @@ Supergraph request contains: * variables * extensions -Supergraph reqponse contains: +Supergraph response contains: * label * data * path diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index f6d0a06e18..277ac05253 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -31,8 +31,6 @@ pub(super) struct SupergraphRequestConf { pub(super) body: bool, /// Send the SDL pub(super) sdl: bool, - /// Send the path - pub(super) path: bool, /// Send the method pub(super) method: bool, } @@ -190,6 +188,8 @@ where .then(|| serde_json::from_slice::(&bytes)) .transpose()?; let context_to_send = request_config.context.then(|| request.context.clone()); + let sdl_to_send = request_config.sdl.then(|| sdl.clone().to_string()); + let method = request_config.method.then(|| parts.method.to_string()); let payload = Externalizable::supergraph_builder() .stage(PipelineStep::SupergraphRequest) @@ -198,8 +198,8 @@ where .and_headers(headers_to_send) .and_body(body_to_send) .and_context(context_to_send) - .method(parts.method.to_string()) - .sdl(sdl.to_string()) + .and_method(method) + .and_sdl(sdl_to_send) .build(); tracing::debug!(?payload, "externalized output"); diff --git a/docs/source/customizations/coprocessor.mdx b/docs/source/customizations/coprocessor.mdx index b09bae0974..1443d0ba05 100644 --- a/docs/source/customizations/coprocessor.mdx +++ b/docs/source/customizations/coprocessor.mdx @@ -98,7 +98,6 @@ coprocessor: body: false context: false sdl: false - path: false method: false response: # By including this key, the `SupergraphService` sends a coprocessor request whenever it's about to send response data to a client (including incremental data via @defer). headers: true From 1d74b8d2274cb76c5db98329f0c93e9d96b0c0b8 Mon Sep 17 00:00:00 2001 From: o0Ignition0o Date: Fri, 18 Aug 2023 13:23:09 +0200 Subject: [PATCH 03/11] remove path from supergraph service coprocessor tests --- apollo-router/src/plugins/coprocessor/supergraph.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 277ac05253..71e02b79e5 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -551,7 +551,6 @@ mod tests { body: true, sdl: false, method: false, - path: false, }, response: Default::default(), }; @@ -685,7 +684,6 @@ mod tests { body: true, sdl: false, method: false, - path: false, }, response: Default::default(), }; From 4233141757938593ea8ba2d2e44a492351acff6a Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Tue, 22 Aug 2023 17:20:26 +0200 Subject: [PATCH 04/11] changeset --- .changesets/feat_geal_supergraph_coprocessor2.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changesets/feat_geal_supergraph_coprocessor2.md diff --git a/.changesets/feat_geal_supergraph_coprocessor2.md b/.changesets/feat_geal_supergraph_coprocessor2.md new file mode 100644 index 0000000000..721398616d --- /dev/null +++ b/.changesets/feat_geal_supergraph_coprocessor2.md @@ -0,0 +1,5 @@ +### Supergraph coprocessor implementation ([PR #3647](https://github.com/apollographql/router/pull/3647)) + +This adds support for coprocessors at the supergraph service level. Supergraph plugins work on the request side with a parsed GraphQL request object, so the query and operation name, variables and extensions are directly accessible. On the response side, they handle GraphQL response objects, with label, data, path, errors, extensions. The supergraph response contains a stream of GraphQL responses, which can contain multiple elements if the query uses `@defer` or subscriptions. When configured to observe the responses, the coprocessor will be called for each of the deferred responses. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3647 \ No newline at end of file From d356a9ef522f51ff39d923ad494d31b08a0b2912 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 23 Aug 2023 10:03:06 +0200 Subject: [PATCH 05/11] snapshot --- ...lo_router__configuration__tests__schema_generation.snap | 7 ------- 1 file changed, 7 deletions(-) diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 31536fb7cb..8ccb745614 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -778,7 +778,6 @@ expression: "&schema" "context": false, "body": false, "sdl": false, - "path": false, "method": false }, "response": { @@ -798,7 +797,6 @@ expression: "&schema" "context": false, "body": false, "sdl": false, - "path": false, "method": false }, "type": "object", @@ -823,11 +821,6 @@ expression: "&schema" "default": false, "type": "boolean" }, - "path": { - "description": "Send the path", - "default": false, - "type": "boolean" - }, "sdl": { "description": "Send the SDL", "default": false, From 6680e6255be7b2d9839ce9423ca379106e0153e7 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 23 Aug 2023 10:59:47 +0200 Subject: [PATCH 06/11] Apply suggestions from code review --- .../feat_geal_supergraph_coprocessor2.md | 31 ++++++++++++++++++- CHANGELOG.md | 21 ------------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/.changesets/feat_geal_supergraph_coprocessor2.md b/.changesets/feat_geal_supergraph_coprocessor2.md index 721398616d..5ddc057f73 100644 --- a/.changesets/feat_geal_supergraph_coprocessor2.md +++ b/.changesets/feat_geal_supergraph_coprocessor2.md @@ -1,5 +1,34 @@ ### Supergraph coprocessor implementation ([PR #3647](https://github.com/apollographql/router/pull/3647)) -This adds support for coprocessors at the supergraph service level. Supergraph plugins work on the request side with a parsed GraphQL request object, so the query and operation name, variables and extensions are directly accessible. On the response side, they handle GraphQL response objects, with label, data, path, errors, extensions. The supergraph response contains a stream of GraphQL responses, which can contain multiple elements if the query uses `@defer` or subscriptions. When configured to observe the responses, the coprocessor will be called for each of the deferred responses. + +Coprocessors now support supergraph service interception. + +On the request side, the coprocessor payload can contain: +- method +- headers +- body +- context +- sdl + +On the response side, the payload can contain: +- status_code +- headers +- body +- context +- sdl + +The supergraph request body contains: +* query +* operation name +* variables +* extensions + +The supergraph response body contains: +* label +* data +* errors +* extensions + +When using `@defer` or subscriptions a supergraph response may contain multiple GraphQL responses, and the coprocessor will be called for each. By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3647 \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 152eadfd61..4232088a54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,27 +28,6 @@ Note that the name of this metric may change in the future. By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3513 -### Supergraph coprocessor implementation ([PR #3408](https://github.com/apollographql/router/pull/3408)) - -Coprocessors now support supergraph service interception. - -Supergraph request contains: -* query -* operation name -* variables -* extensions - -Supergraph response contains: -* label -* data -* path -* errors -* extensions - -When using `@defer` or subscriptions a supergraph response may contain multiple GraphQL responses, and the coprocessor will be called for each. - -By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3408 - ### Configure AWS SigV4 authentication for subgraph requests ([PR #3365](https://github.com/apollographql/router/pull/3365)) Secure your router to subgraph communication on AWS using [Signature Version 4](https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html) (Sigv4)! From cb4d33b5e209835ad1a49b9eaa1a6d6bb0149504 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Tue, 5 Sep 2023 12:05:11 +0200 Subject: [PATCH 07/11] add the apollo.router.operations.coprocessor metric --- .../src/plugins/coprocessor/supergraph.rs | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 71e02b79e5..e4cf5a05f0 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -89,7 +89,8 @@ impl SupergraphStage { let sdl = sdl.clone(); async move { - process_supergraph_request_stage( + let mut succeeded = true; + let result = process_supergraph_request_stage( http_client, coprocessor_url, sdl, @@ -98,11 +99,19 @@ impl SupergraphStage { ) .await .map_err(|error| { + succeeded = false; tracing::error!( "external extensibility: supergraph request stage error: {error}" ); error - }) + }); + tracing::info!( + monotonic_counter.apollo.router.operations.coprocessor = 1u64, + coprocessor.stage = %PipelineStep::SupergraphRequest, + coprocessor.succeeded = succeeded, + "Total operations with co-processors enabled" + ); + result } }) }); @@ -119,7 +128,8 @@ impl SupergraphStage { async move { let response: supergraph::Response = fut.await?; - process_supergraph_response_stage( + let mut succeeded = true; + let result = process_supergraph_response_stage( http_client, coprocessor_url, sdl, @@ -128,11 +138,19 @@ impl SupergraphStage { ) .await .map_err(|error| { + succeeded = false; tracing::error!( "external extensibility: router response stage error: {error}" ); error - }) + }); + tracing::info!( + monotonic_counter.apollo.router.operations.coprocessor = 1u64, + coprocessor.stage = %PipelineStep::SupergraphResponse, + coprocessor.succeeded = succeeded, + "Total operations with co-processors enabled" + ); + result } }) }); From 773bd026b7c9e87817ba38799214103c84020ba0 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Tue, 5 Sep 2023 12:09:29 +0200 Subject: [PATCH 08/11] configuration tests --- ...rics__test__metrics@coprocessor.router.yaml.snap | 4 ++-- .../testdata/metrics/coprocessor.router.yaml | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap index b5eb1df764..eb5762419a 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap @@ -8,6 +8,6 @@ value.apollo.router.config.coprocessor: opt__router__response__: "true" opt__subgraph__request__: "true" opt__subgraph__response__: "true" - opt__supergraph__request__: "false" - opt__supergraph__response__: "false" + opt__supergraph__request__: "true" + opt__supergraph__response__: "true" diff --git a/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml b/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml index a1a7714784..12ad67acf7 100644 --- a/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml +++ b/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml @@ -15,6 +15,19 @@ coprocessor: body: true headers: true status_code: true + supergraph: + request: + headers: true + body: true + context: true + method: true + sdl: true + response: + sdl: true + context: true + body: true + headers: true + status_code: true subgraph: all: request: From 6525831c4d771562d52fdd49740463d7e9263ab0 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 13 Sep 2023 17:17:18 +0200 Subject: [PATCH 09/11] lint --- apollo-router/src/plugins/coprocessor/supergraph.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index e4cf5a05f0..a76de0763e 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -618,7 +618,7 @@ mod tests { Box::pin(async { Ok(hyper::Response::builder() .body(Body::from( - r##"{ + r#"{ "version": 1, "stage": "SupergraphRequest", "control": "continue", @@ -662,7 +662,7 @@ mod tests { }, "serviceName": "service name shouldn't change", "uri": "http://thisurihaschanged" - }"##, + }"#, )) .unwrap()) }) @@ -713,7 +713,7 @@ mod tests { Box::pin(async { Ok(hyper::Response::builder() .body(Body::from( - r##"{ + r#"{ "version": 1, "stage": "SupergraphRequest", "control": { @@ -730,7 +730,7 @@ mod tests { "headers": { "aheader": ["a value"] } - }"##, + }"#, )) .unwrap()) }) From 22116f55dce1c6db654b52225f03d3fd8746586e Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 13 Sep 2023 17:17:50 +0200 Subject: [PATCH 10/11] unused line --- .changesets/feat_geal_supergraph_coprocessor2.md | 1 - 1 file changed, 1 deletion(-) diff --git a/.changesets/feat_geal_supergraph_coprocessor2.md b/.changesets/feat_geal_supergraph_coprocessor2.md index 5ddc057f73..9dd3030a34 100644 --- a/.changesets/feat_geal_supergraph_coprocessor2.md +++ b/.changesets/feat_geal_supergraph_coprocessor2.md @@ -1,6 +1,5 @@ ### Supergraph coprocessor implementation ([PR #3647](https://github.com/apollographql/router/pull/3647)) - Coprocessors now support supergraph service interception. On the request side, the coprocessor payload can contain: From 522f3d1f82c05321cbd620b9515a7966b493d0e8 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 13 Sep 2023 17:24:55 +0200 Subject: [PATCH 11/11] snapshot --- ...trics__test__metrics@coprocessor.router.yaml.snap | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap index eb5762419a..4c2078200a 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap @@ -4,10 +4,10 @@ expression: "&metrics.metrics" --- value.apollo.router.config.coprocessor: - 1 - - opt__router__request__: "true" - opt__router__response__: "true" - opt__subgraph__request__: "true" - opt__subgraph__response__: "true" - opt__supergraph__request__: "true" - opt__supergraph__response__: "true" + - opt__router__request__: true + opt__router__response__: true + opt__subgraph__request__: true + opt__subgraph__response__: true + opt__supergraph__request__: true + opt__supergraph__response__: true