diff --git a/.changesets/feat_geal_supergraph_coprocessor2.md b/.changesets/feat_geal_supergraph_coprocessor2.md new file mode 100644 index 0000000000..9dd3030a34 --- /dev/null +++ b/.changesets/feat_geal_supergraph_coprocessor2.md @@ -0,0 +1,33 @@ +### Supergraph coprocessor implementation ([PR #3647](https://github.com/apollographql/router/pull/3647)) + +Coprocessors now support supergraph service interception. + +On the request side, the coprocessor payload can contain: +- method +- headers +- body +- context +- sdl + +On the response side, the payload can contain: +- status_code +- headers +- body +- context +- sdl + +The supergraph request body contains: +* query +* operation name +* variables +* extensions + +The supergraph response body contains: +* label +* data +* errors +* extensions + +When using `@defer` or subscriptions a supergraph response may contain multiple GraphQL responses, and the coprocessor will be called for each. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3647 \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index e3050a84b1..082c5f4c08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -364,7 +364,7 @@ By [@garypen](https://github.com/garypen) in https://github.com/apollographql/ro ### Spelling of `content_negociation` corrected to `content_negotiation` ([Issue #3204](https://github.com/apollographql/router/issues/3204)) -We had a bit of a French twist on one of our internal module names. We won't promise it won't happen again, but `content_negociation` is spelled as `content_negotiation` now. 😄 +We had a bit of a French twist on one of our internal module names. We won't promise it won't happen again, but `content_negociation` is spelled as `content_negotiation` now. 😄 Thank you for this contribution! @@ -1720,7 +1720,7 @@ By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/p ### Small gzip'd responses no longer cause a panic -A regression introduced in v1.17.0 — again related to compression — has been resolved. This occurred when small responses used invalid buffer management, causing a panic. +A regression introduced in v1.17.0 — again related to compression — has been resolved. This occurred when small responses used invalid buffer management, causing a panic. By [@dbanty](https://github.com/dbanty) in https://github.com/apollographql/router/pull/3047 @@ -2254,7 +2254,7 @@ By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographq ### Distributed caching: Don't send Redis' `CLIENT SETNAME` ([PR #2825](https://github.com/apollographql/router/pull/2825)) -We won't send [the `CLIENT SETNAME` command](https://redis.io/commands/client-setname/) to connected Redis servers. This resolves an incompatibility with some Redis-compatible servers since not all "Redis-compatible" offerings (like Google Memorystore) actually support _every_ Redis command. We weren't actually necessitating this feature, it was just a feature that could be enabled optionally on our Redis client. No Router functionality is impacted. +We won't send [the `CLIENT SETNAME` command](https://redis.io/commands/client-setname/) to connected Redis servers. This resolves an incompatibility with some Redis-compatible servers since not all "Redis-compatible" offerings (like Google Memorystore) actually support _every_ Redis command. We weren't actually necessitating this feature, it was just a feature that could be enabled optionally on our Redis client. No Router functionality is impacted. By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2825 @@ -3424,7 +3424,7 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p ### Optimize header propagation plugin's regular expression matching ([PR #2392](https://github.com/apollographql/router/pull/2392)) -We've changed the header propagation plugins' behavior to reduce the chance of memory allocations occurring when applying regex-based header propagation rules. +We've changed the header propagation plugins' behavior to reduce the chance of memory allocations occurring when applying regex-based header propagation rules. By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/2392 @@ -3474,7 +3474,7 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p Configuration changes will be [automatically migrated on load](https://www.apollographql.com/docs/router/configuration/overview#upgrading-your-router-configuration). However, you should update your source configuration files as these will become breaking changes in a future major release. -### Defer support graduates from preview ([Issue #2368](https://github.com/apollographql/router/issues/2368)) +### Defer support graduates from preview ([Issue #2368](https://github.com/apollographql/router/issues/2368)) We're pleased to announce that [`@defer` support](https://www.apollographql.com/docs/router/executing-operations/defer-support/) has been promoted to general availability in accordance with our [product launch stages](https://www.apollographql.com/docs/resources/product-launch-stages/). diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap index bdc1a7899b..4c2078200a 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@coprocessor.router.yaml.snap @@ -8,6 +8,6 @@ value.apollo.router.config.coprocessor: opt__router__response__: true opt__subgraph__request__: true opt__subgraph__response__: true - opt__supergraph__request__: false - opt__supergraph__response__: false + opt__supergraph__request__: true + opt__supergraph__response__: true diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index f666bd1baf..a175614292 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -779,6 +779,106 @@ expression: "&schema" }, "additionalProperties": false }, + "supergraph": { + "description": "The supergraph stage request/response configuration", + "default": { + "request": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "method": false + }, + "response": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "status_code": false + } + }, + "type": "object", + "properties": { + "request": { + "description": "The request configuration", + "default": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "method": false + }, + "type": "object", + "properties": { + "body": { + "description": "Send the body", + "default": false, + "type": "boolean" + }, + "context": { + "description": "Send the context", + "default": false, + "type": "boolean" + }, + "headers": { + "description": "Send the headers", + "default": false, + "type": "boolean" + }, + "method": { + "description": "Send the method", + "default": false, + "type": "boolean" + }, + "sdl": { + "description": "Send the SDL", + "default": false, + "type": "boolean" + } + }, + "additionalProperties": false + }, + "response": { + "description": "What information is passed to a router request/response stage", + "default": { + "headers": false, + "context": false, + "body": false, + "sdl": false, + "status_code": false + }, + "type": "object", + "properties": { + "body": { + "description": "Send the body", + "default": false, + "type": "boolean" + }, + "context": { + "description": "Send the context", + "default": false, + "type": "boolean" + }, + "headers": { + "description": "Send the headers", + "default": false, + "type": "boolean" + }, + "sdl": { + "description": "Send the SDL", + "default": false, + "type": "boolean" + }, + "status_code": { + "description": "Send the HTTP status", + "default": false, + "type": "boolean" + } + }, + "additionalProperties": false + } + } + }, "timeout": { "description": "The timeout for external requests", "default": { diff --git a/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml b/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml index a1a7714784..12ad67acf7 100644 --- a/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml +++ b/apollo-router/src/configuration/testdata/metrics/coprocessor.router.yaml @@ -15,6 +15,19 @@ coprocessor: body: true headers: true status_code: true + supergraph: + request: + headers: true + body: true + context: true + method: true + sdl: true + response: + sdl: true + context: true + body: true + headers: true + status_code: true subgraph: all: request: diff --git a/apollo-router/src/plugins/coprocessor.rs b/apollo-router/src/plugins/coprocessor/mod.rs similarity index 97% rename from apollo-router/src/plugins/coprocessor.rs rename to apollo-router/src/plugins/coprocessor/mod.rs index 151d9fa2f5..5580c3d2da 100644 --- a/apollo-router/src/plugins/coprocessor.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -36,6 +36,7 @@ use crate::layers::ServiceBuilderExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; use crate::register_plugin; +use crate::services; use crate::services::external::Control; use crate::services::external::Externalizable; use crate::services::external::PipelineStep; @@ -45,6 +46,11 @@ use crate::services::router; use crate::services::subgraph; use crate::tracer::TraceId; +#[cfg(test)] +mod test; + +mod supergraph; + pub(crate) const EXTERNAL_SPAN_NAME: &str = "external_plugin"; const POOL_IDLE_TIMEOUT_DURATION: Option = Some(Duration::from_secs(5)); @@ -87,6 +93,13 @@ impl Plugin for CoprocessorPlugin { self.router_service(service) } + fn supergraph_service( + &self, + service: services::supergraph::BoxService, + ) -> services::supergraph::BoxService { + self.supergraph_service(service) + } + fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService { self.subgraph_service(name, service) } @@ -150,6 +163,18 @@ where ) } + fn supergraph_service( + &self, + service: services::supergraph::BoxService, + ) -> services::supergraph::BoxService { + self.configuration.supergraph.as_service( + self.http_client.clone(), + service, + self.configuration.url.clone(), + self.sdl.clone(), + ) + } + fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService { self.configuration.subgraph.all.as_service( self.http_client.clone(), @@ -240,6 +265,9 @@ struct Conf { /// The router stage request/response configuration #[serde(default)] router: RouterStage, + /// The supergraph stage request/response configuration + #[serde(default)] + supergraph: supergraph::SupergraphStage, /// The subgraph stage request/response configuration #[serde(default)] subgraph: SubgraphStages, @@ -677,7 +705,7 @@ where // If first is None, or contains an error we return an error let opt_first: Option = first.and_then(|f| f.ok()); let bytes = match opt_first { - Some(b) => b.to_vec(), + Some(b) => b, None => { tracing::error!( "Coprocessor cannot convert body into future due to problem with first part" @@ -696,7 +724,7 @@ where .transpose()?; let body_to_send = response_config .body - .then(|| String::from_utf8(bytes.clone())) + .then(|| std::str::from_utf8(&bytes).map(|s| s.to_string())) .transpose()?; let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.then(|| response.context.clone()); @@ -858,7 +886,6 @@ where // First, extract the data we need from our request and prepare our // external call. Use our configuration to figure out which data to send. let (parts, body) = request.subgraph_request.into_parts(); - let bytes = Bytes::from(serde_json::to_vec(&body)?); let headers_to_send = request_config .headers @@ -867,7 +894,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::to_value(&body)) .transpose()?; let context_to_send = request_config.context.then(|| request.context.clone()); let uri = request_config.uri.then(|| parts.uri.to_string()); @@ -998,7 +1025,6 @@ where // external call. Use our configuration to figure out which data to send. let (parts, body) = response.response.into_parts(); - let bytes = Bytes::from(serde_json::to_vec(&body)?); let headers_to_send = response_config .headers @@ -1009,7 +1035,7 @@ where let body_to_send = response_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::to_value(&body)) .transpose()?; let context_to_send = response_config.context.then(|| response.context.clone()); let service_name = response_config.service_name.then_some(service_name); @@ -1099,7 +1125,7 @@ fn validate_coprocessor_output( } /// Convert a HeaderMap into a HashMap -pub(super) fn externalize_header_map( +pub(crate) fn externalize_header_map( input: &HeaderMap, ) -> Result>, BoxError> { let mut output = HashMap::new(); diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs new file mode 100644 index 0000000000..a76de0763e --- /dev/null +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -0,0 +1,897 @@ +use std::ops::ControlFlow; +use std::sync::Arc; + +use futures::future; +use futures::stream; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use tower::BoxError; +use tower::ServiceBuilder; +use tower_service::Service; + +use super::externalize_header_map; +use super::*; +use crate::graphql; +use crate::layers::async_checkpoint::AsyncCheckpointLayer; +use crate::layers::ServiceBuilderExt; +use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; +use crate::response; +use crate::services::supergraph; + +/// What information is passed to a router request/response stage +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(default, deny_unknown_fields)] +pub(super) struct SupergraphRequestConf { + /// Send the headers + pub(super) headers: bool, + /// Send the context + pub(super) context: bool, + /// Send the body + pub(super) body: bool, + /// Send the SDL + pub(super) sdl: bool, + /// Send the method + pub(super) method: bool, +} + +/// What information is passed to a router request/response stage +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(default, deny_unknown_fields)] +pub(super) struct SupergraphResponseConf { + /// Send the headers + pub(super) headers: bool, + /// Send the context + pub(super) context: bool, + /// Send the body + pub(super) body: bool, + /// Send the SDL + pub(super) sdl: bool, + /// Send the HTTP status + pub(super) status_code: bool, +} + +#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] +#[serde(default)] +pub(super) struct SupergraphStage { + /// The request configuration + pub(super) request: SupergraphRequestConf, + // /// The response configuration + pub(super) response: SupergraphResponseConf, +} + +impl SupergraphStage { + pub(crate) fn as_service( + &self, + http_client: C, + service: supergraph::BoxService, + coprocessor_url: String, + sdl: Arc, + ) -> supergraph::BoxService + where + C: Service, Response = hyper::Response, Error = BoxError> + + Clone + + Send + + Sync + + 'static, + >>::Future: Send + 'static, + { + let request_layer = (self.request != Default::default()).then_some({ + let request_config = self.request.clone(); + let coprocessor_url = coprocessor_url.clone(); + let http_client = http_client.clone(); + let sdl = sdl.clone(); + + AsyncCheckpointLayer::new(move |request: supergraph::Request| { + let request_config = request_config.clone(); + let coprocessor_url = coprocessor_url.clone(); + let http_client = http_client.clone(); + let sdl = sdl.clone(); + + async move { + let mut succeeded = true; + let result = process_supergraph_request_stage( + http_client, + coprocessor_url, + sdl, + request, + request_config, + ) + .await + .map_err(|error| { + succeeded = false; + tracing::error!( + "external extensibility: supergraph request stage error: {error}" + ); + error + }); + tracing::info!( + monotonic_counter.apollo.router.operations.coprocessor = 1u64, + coprocessor.stage = %PipelineStep::SupergraphRequest, + coprocessor.succeeded = succeeded, + "Total operations with co-processors enabled" + ); + result + } + }) + }); + + let response_layer = (self.response != Default::default()).then_some({ + let response_config = self.response.clone(); + + MapFutureLayer::new(move |fut| { + let coprocessor_url = coprocessor_url.clone(); + let sdl: Arc = sdl.clone(); + let http_client = http_client.clone(); + let response_config = response_config.clone(); + + async move { + let response: supergraph::Response = fut.await?; + + let mut succeeded = true; + let result = process_supergraph_response_stage( + http_client, + coprocessor_url, + sdl, + response, + response_config, + ) + .await + .map_err(|error| { + succeeded = false; + tracing::error!( + "external extensibility: router response stage error: {error}" + ); + error + }); + tracing::info!( + monotonic_counter.apollo.router.operations.coprocessor = 1u64, + coprocessor.stage = %PipelineStep::SupergraphResponse, + coprocessor.succeeded = succeeded, + "Total operations with co-processors enabled" + ); + result + } + }) + }); + + fn external_service_span() -> impl Fn(&supergraph::Request) -> tracing::Span + Clone { + move |_request: &supergraph::Request| { + tracing::info_span!( + EXTERNAL_SPAN_NAME, + "external service" = stringify!(supergraph::Request), + "otel.kind" = "INTERNAL" + ) + } + } + + ServiceBuilder::new() + .instrument(external_service_span()) + .option_layer(request_layer) + .option_layer(response_layer) + .buffered() + .service(service) + .boxed() + } +} + +async fn process_supergraph_request_stage( + http_client: C, + coprocessor_url: String, + sdl: Arc, + mut request: supergraph::Request, + request_config: SupergraphRequestConf, +) -> Result, BoxError> +where + C: Service, Response = hyper::Response, Error = BoxError> + + Clone + + Send + + Sync + + 'static, + >>::Future: Send + 'static, +{ + // Call into our out of process processor with a body of our body + // First, extract the data we need from our request and prepare our + // external call. Use our configuration to figure out which data to send. + let (parts, body) = request.supergraph_request.into_parts(); + let bytes = Bytes::from(serde_json::to_vec(&body)?); + + let headers_to_send = request_config + .headers + .then(|| externalize_header_map(&parts.headers)) + .transpose()?; + + let body_to_send = request_config + .body + .then(|| serde_json::from_slice::(&bytes)) + .transpose()?; + let context_to_send = request_config.context.then(|| request.context.clone()); + let sdl_to_send = request_config.sdl.then(|| sdl.clone().to_string()); + let method = request_config.method.then(|| parts.method.to_string()); + + let payload = Externalizable::supergraph_builder() + .stage(PipelineStep::SupergraphRequest) + .control(Control::default()) + .and_id(TraceId::maybe_new().map(|id| id.to_string())) + .and_headers(headers_to_send) + .and_body(body_to_send) + .and_context(context_to_send) + .and_method(method) + .and_sdl(sdl_to_send) + .build(); + + tracing::debug!(?payload, "externalized output"); + let guard = request.context.enter_active_request(); + let start = Instant::now(); + let co_processor_result = payload.call(http_client, &coprocessor_url).await; + let duration = start.elapsed().as_secs_f64(); + drop(guard); + tracing::info!( + histogram.apollo.router.operations.coprocessor.duration = duration, + coprocessor.stage = %PipelineStep::SupergraphRequest, + ); + + tracing::debug!(?co_processor_result, "co-processor returned"); + let co_processor_output = co_processor_result?; + validate_coprocessor_output(&co_processor_output, PipelineStep::SupergraphRequest)?; + // unwrap is safe here because validate_coprocessor_output made sure control is available + let control = co_processor_output.control.expect("validated above; qed"); + + // Thirdly, we need to interpret the control flow which may have been + // updated by our co-processor and decide if we should proceed or stop. + + if matches!(control, Control::Break(_)) { + // Ensure the code is a valid http status code + let code = control.get_http_status()?; + + let res = { + let graphql_response: crate::graphql::Response = + serde_json::from_value(co_processor_output.body.unwrap_or(serde_json::Value::Null)) + .unwrap_or_else(|error| { + crate::graphql::Response::builder() + .errors(vec![Error::builder() + .message(format!( + "couldn't deserialize coprocessor output body: {error}" + )) + .extension_code("EXTERNAL_DESERIALIZATION_ERROR") + .build()]) + .build() + }); + + let mut http_response = http::Response::builder() + .status(code) + .body(stream::once(future::ready(graphql_response)).boxed())?; + if let Some(headers) = co_processor_output.headers { + *http_response.headers_mut() = internalize_header_map(headers)?; + } + + let supergraph_response = supergraph::Response { + response: http_response, + context: request.context, + }; + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + supergraph_response + .context + .upsert_json_value(key, move |_current| value); + } + } + + supergraph_response + }; + return Ok(ControlFlow::Break(res)); + } + + // Finally, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming request with the updated bits if they + // are present in our co_processor_output. + + let new_body: crate::graphql::Request = match co_processor_output.body { + Some(value) => serde_json::from_value(value)?, + None => body, + }; + + request.supergraph_request = http::Request::from_parts(parts, new_body); + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + request + .context + .upsert_json_value(key, move |_current| value); + } + } + + if let Some(headers) = co_processor_output.headers { + *request.supergraph_request.headers_mut() = internalize_header_map(headers)?; + } + + if let Some(uri) = co_processor_output.uri { + *request.supergraph_request.uri_mut() = uri.parse()?; + } + + Ok(ControlFlow::Continue(request)) +} + +async fn process_supergraph_response_stage( + http_client: C, + coprocessor_url: String, + sdl: Arc, + response: supergraph::Response, + response_config: SupergraphResponseConf, +) -> Result +where + C: Service, Response = hyper::Response, Error = BoxError> + + Clone + + Send + + Sync + + 'static, + >>::Future: Send + 'static, +{ + // split the response into parts + body + let (mut parts, body) = response.response.into_parts(); + + // we split the body (which is a stream) into first response + rest of responses, + // for which we will implement mapping later + let (first, rest): (Option, graphql::ResponseStream) = + body.into_future().await; + + // If first is None, we return an error + let first = first.ok_or_else(|| { + BoxError::from("Coprocessor cannot convert body into future due to problem with first part") + })?; + + // Now we process our first chunk of response + // Encode headers, body, status, context, sdl to create a payload + let headers_to_send = response_config + .headers + .then(|| externalize_header_map(&parts.headers)) + .transpose()?; + let body_to_send = response_config + .body + .then(|| serde_json::to_value(&first).expect("serialization will not fail")); + let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); + let context_to_send = response_config.context.then(|| response.context.clone()); + let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); + + let payload = Externalizable::supergraph_builder() + .stage(PipelineStep::SupergraphResponse) + .and_id(TraceId::maybe_new().map(|id| id.to_string())) + .and_headers(headers_to_send) + .and_body(body_to_send) + .and_context(context_to_send) + .and_status_code(status_to_send) + .and_sdl(sdl_to_send.clone()) + .build(); + + // Second, call our co-processor and get a reply. + tracing::debug!(?payload, "externalized output"); + let guard = response.context.enter_active_request(); + let start = Instant::now(); + let co_processor_result = payload.call(http_client.clone(), &coprocessor_url).await; + let duration = start.elapsed().as_secs_f64(); + drop(guard); + tracing::info!( + histogram.apollo.router.operations.coprocessor.duration = duration, + coprocessor.stage = %PipelineStep::SupergraphResponse, + ); + + tracing::debug!(?co_processor_result, "co-processor returned"); + let co_processor_output = co_processor_result?; + + validate_coprocessor_output(&co_processor_output, PipelineStep::SupergraphResponse)?; + + // Third, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming response with the updated bits if they + // are present in our co_processor_output. If they aren't present, just use the + // bits that we sent to the co_processor. + let new_body: crate::response::Response = match co_processor_output.body { + Some(value) => serde_json::from_value(value)?, + None => first, + }; + + if let Some(control) = co_processor_output.control { + parts.status = control.get_http_status()? + } + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + response + .context + .upsert_json_value(key, move |_current| value); + } + } + + if let Some(headers) = co_processor_output.headers { + parts.headers = internalize_header_map(headers)?; + } + + // Clone all the bits we need + let context = response.context.clone(); + let map_context = response.context.clone(); + + // Map the rest of our body to process subsequent chunks of response + let mapped_stream = rest + .then(move |deferred_response| { + let generator_client = http_client.clone(); + let generator_coprocessor_url = coprocessor_url.clone(); + let generator_map_context = map_context.clone(); + let generator_sdl_to_send = sdl_to_send.clone(); + + async move { + let body_to_send = response_config.body.then(|| { + serde_json::to_value(&deferred_response).expect("serialization will not fail") + }); + let context_to_send = response_config + .context + .then(|| generator_map_context.clone()); + + // Note: We deliberately DO NOT send headers or status_code even if the user has + // requested them. That's because they are meaningless on a deferred response and + // providing them will be a source of confusion. + let payload = Externalizable::router_builder() + .stage(PipelineStep::SupergraphResponse) + .and_id(TraceId::maybe_new().map(|id| id.to_string())) + .and_body(body_to_send) + .and_context(context_to_send) + .and_sdl(generator_sdl_to_send) + .build(); + + // Second, call our co-processor and get a reply. + tracing::debug!(?payload, "externalized output"); + let guard = generator_map_context.enter_active_request(); + let co_processor_result = payload + .call(generator_client, &generator_coprocessor_url) + .await; + drop(guard); + tracing::debug!(?co_processor_result, "co-processor returned"); + let co_processor_output = co_processor_result?; + + validate_coprocessor_output( + &co_processor_output, + PipelineStep::SupergraphResponse, + )?; + + // Third, process our reply and act on the contents. Our processing logic is + // that we replace "bits" of our incoming response with the updated bits if they + // are present in our co_processor_output. If they aren't present, just use the + // bits that we sent to the co_processor. + let new_deferred_response: crate::response::Response = + match co_processor_output.body { + Some(value) => serde_json::from_value(value)?, + None => deferred_response, + }; + + if let Some(context) = co_processor_output.context { + for (key, value) in context.try_into_iter()? { + generator_map_context.upsert_json_value(key, move |_current| value); + } + } + + // We return the deferred_response into our stream of response chunks + Ok(new_deferred_response) + } + }) + .map(|res: Result| match res { + Ok(response) => response, + Err(e) => { + tracing::error!("coprocessor error handling deferred supergraph response: {e}"); + response::Response::builder() + .error( + Error::builder() + .message("Internal error handling deferred response") + .extension_code("INTERNAL_ERROR") + .build(), + ) + .build() + } + }); + + // Create our response stream which consists of our first body chained with the + // rest of the responses in our mapped stream. + let stream = once(ready(new_body)).chain(mapped_stream).boxed(); + + // Finally, return a response which has a Body that wraps our stream of response chunks. + Ok(supergraph::Response { + context, + response: http::Response::from_parts(parts, stream), + }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::future::BoxFuture; + use http::StatusCode; + use hyper::Body; + use serde_json::json; + use tower::BoxError; + use tower::ServiceExt; + + use super::super::*; + use super::*; + use crate::plugin::test::MockHttpClientService; + use crate::plugin::test::MockSupergraphService; + use crate::services::supergraph; + + #[allow(clippy::type_complexity)] + pub(crate) fn mock_with_callback( + callback: fn( + hyper::Request, + ) -> BoxFuture<'static, Result, BoxError>>, + ) -> MockHttpClientService { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_call().returning(callback); + mock_http_client + }); + mock_http_client + }); + + mock_http_client + } + + #[allow(clippy::type_complexity)] + fn mock_with_deferred_callback( + callback: fn( + hyper::Request, + ) -> BoxFuture<'static, Result, BoxError>>, + ) -> MockHttpClientService { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_clone().returning(move || { + let mut mock_http_client = MockHttpClientService::new(); + mock_http_client.expect_call().returning(callback); + mock_http_client + }); + mock_http_client + }); + mock_http_client + }); + + mock_http_client + } + + #[tokio::test] + async fn external_plugin_supergraph_request() { + let supergraph_stage = SupergraphStage { + request: SupergraphRequestConf { + headers: false, + context: false, + body: true, + sdl: false, + method: false, + }, + response: Default::default(), + }; + + // This will never be called because we will fail at the coprocessor. + let mut mock_supergraph_service = MockSupergraphService::new(); + + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + // Let's assert that the subgraph request has been transformed as it should have. + assert_eq!( + req.supergraph_request.headers().get("cookie").unwrap(), + "tasty_cookie=strawberry" + ); + + assert_eq!( + req.context + .get::<&str, u8>("this-is-a-test-context") + .unwrap() + .unwrap(), + 42 + ); + + // The subgraph uri should have changed + assert_eq!( + Some("MyQuery"), + req.supergraph_request.body().operation_name.as_deref() + ); + + // The query should have changed + assert_eq!( + "query Long {\n me {\n name\n}\n}", + req.supergraph_request.body().query.as_ref().unwrap() + ); + + Ok(supergraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .errors(Vec::new()) + .extensions(crate::json_ext::Object::new()) + .context(req.context) + .build() + .unwrap()) + }); + + let mock_http_client = mock_with_callback(move |_: hyper::Request| { + Box::pin(async { + Ok(hyper::Response::builder() + .body(Body::from( + r#"{ + "version": 1, + "stage": "SupergraphRequest", + "control": "continue", + "headers": { + "cookie": [ + "tasty_cookie=strawberry" + ], + "content-type": [ + "application/json" + ], + "host": [ + "127.0.0.1:4000" + ], + "apollo-federation-include-trace": [ + "ftv1" + ], + "apollographql-client-name": [ + "manual" + ], + "accept": [ + "*/*" + ], + "user-agent": [ + "curl/7.79.1" + ], + "content-length": [ + "46" + ] + }, + "body": { + "query": "query Long {\n me {\n name\n}\n}", + "operationName": "MyQuery" + }, + "context": { + "entries": { + "accepts-json": false, + "accepts-wildcard": true, + "accepts-multipart": false, + "this-is-a-test-context": 42 + } + }, + "serviceName": "service name shouldn't change", + "uri": "http://thisurihaschanged" + }"#, + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::fake_builder().build().unwrap(); + + assert_eq!( + serde_json_bytes::json!({ "test": 1234_u32 }), + service + .oneshot(request) + .await + .unwrap() + .response + .into_body() + .next() + .await + .unwrap() + .data + .unwrap() + ); + } + + #[tokio::test] + async fn external_plugin_supergraph_request_controlflow_break() { + let supergraph_stage = SupergraphStage { + request: SupergraphRequestConf { + headers: false, + context: false, + body: true, + sdl: false, + method: false, + }, + response: Default::default(), + }; + + // This will never be called because we will fail at the coprocessor. + let mock_supergraph_service = MockSupergraphService::new(); + + let mock_http_client = mock_with_callback(move |_: hyper::Request| { + Box::pin(async { + Ok(hyper::Response::builder() + .body(Body::from( + r#"{ + "version": 1, + "stage": "SupergraphRequest", + "control": { + "break": 200 + }, + "body": { + "errors": [{ "message": "my error message" }] + }, + "context": { + "entries": { + "testKey": true + } + }, + "headers": { + "aheader": ["a value"] + } + }"#, + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::fake_builder().build().unwrap(); + + let crate::services::supergraph::Response { + mut response, + context, + } = service.oneshot(request).await.unwrap(); + + assert!(context.get::<_, bool>("testKey").unwrap().unwrap()); + + let value = response.headers().get("aheader").unwrap(); + + assert_eq!("a value", value); + + assert_eq!( + "my error message", + response.body_mut().next().await.unwrap().errors[0] + .message + .as_str() + ); + } + + #[tokio::test] + async fn external_plugin_supergraph_response() { + let supergraph_stage = SupergraphStage { + response: SupergraphResponseConf { + headers: true, + context: true, + body: true, + sdl: true, + status_code: false, + }, + request: Default::default(), + }; + + let mut mock_supergraph_service = MockSupergraphService::new(); + + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + Ok(supergraph::Response::builder() + .data(json!({ "test": 1234_u32 })) + .errors(Vec::new()) + .extensions(crate::json_ext::Object::new()) + .context(req.context) + .build() + .unwrap()) + }); + + let mock_http_client = mock_with_deferred_callback(move |res: hyper::Request| { + Box::pin(async { + let deserialized_response: Externalizable = + serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await.unwrap()) + .unwrap(); + + assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); + assert_eq!( + PipelineStep::SupergraphResponse.to_string(), + deserialized_response.stage + ); + + assert_eq!( + json! {{"data":{ "test": 1234_u32 }}}, + deserialized_response.body.unwrap() + ); + + let input = json!( + { + "version": 1, + "stage": "SupergraphResponse", + "control": { + "break": 400 + }, + "id": "1b19c05fdafc521016df33148ad63c1b", + "headers": { + "cookie": [ + "tasty_cookie=strawberry" + ], + "content-type": [ + "application/json" + ], + "host": [ + "127.0.0.1:4000" + ], + "apollo-federation-include-trace": [ + "ftv1" + ], + "apollographql-client-name": [ + "manual" + ], + "accept": [ + "*/*" + ], + "user-agent": [ + "curl/7.79.1" + ], + "content-length": [ + "46" + ] + }, + "body": { + "data": { "test": 42 } + }, + "context": { + "entries": { + "accepts-json": false, + "accepts-wildcard": true, + "accepts-multipart": false, + "this-is-a-test-context": 42 + } + }, + "sdl": "the sdl shouldnt change" + }); + Ok(hyper::Response::builder() + .body(Body::from(serde_json::to_string(&input).unwrap())) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::canned_builder().build().unwrap(); + + let mut res = service.oneshot(request).await.unwrap(); + + // Let's assert that the router request has been transformed as it should have. + assert_eq!(res.response.status(), StatusCode::BAD_REQUEST); + assert_eq!( + res.response.headers().get("cookie").unwrap(), + "tasty_cookie=strawberry" + ); + + assert_eq!( + res.context + .get::<&str, u8>("this-is-a-test-context") + .unwrap() + .unwrap(), + 42 + ); + + let body = res.response.body_mut().next().await.unwrap(); + // the body should have changed: + assert_eq!( + json!({ "data": { "test": 42_u32 } }), + serde_json::to_value(&body).unwrap(), + ); + } +} diff --git a/apollo-router/src/plugins/coprocessor_test.rs b/apollo-router/src/plugins/coprocessor/test.rs similarity index 99% rename from apollo-router/src/plugins/coprocessor_test.rs rename to apollo-router/src/plugins/coprocessor/test.rs index 76d4177768..3f4b50876e 100644 --- a/apollo-router/src/plugins/coprocessor_test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -17,7 +17,7 @@ mod tests { use tower::BoxError; use tower::ServiceExt; - use super::super::coprocessor::*; + use super::super::*; use crate::plugin::test::MockHttpClientService; use crate::plugin::test::MockRouterService; use crate::plugin::test::MockSubgraphService; diff --git a/apollo-router/src/plugins/mod.rs b/apollo-router/src/plugins/mod.rs index df3252d3df..14526d158f 100644 --- a/apollo-router/src/plugins/mod.rs +++ b/apollo-router/src/plugins/mod.rs @@ -23,8 +23,6 @@ macro_rules! schemar_fn { pub(crate) mod authentication; pub(crate) mod authorization; mod coprocessor; -#[cfg(test)] -mod coprocessor_test; pub(crate) mod csrf; mod expose_query_plan; mod forbid_mutations; diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs index 440f399d1d..2161ac053d 100644 --- a/apollo-router/src/services/external.rs +++ b/apollo-router/src/services/external.rs @@ -131,6 +131,42 @@ where } } + #[builder(visibility = "pub(crate)")] + /// This is the constructor (or builder) to use when constructing a Supergraph + /// `Externalizable`. + /// + fn supergraph_new( + stage: PipelineStep, + control: Option, + id: Option, + headers: Option>>, + body: Option, + context: Option, + status_code: Option, + method: Option, + sdl: Option, + ) -> Self { + assert!(matches!( + stage, + PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse + )); + Externalizable { + version: EXTERNALIZABLE_VERSION, + stage: stage.to_string(), + control, + id, + headers, + body, + context, + status_code, + sdl, + uri: None, + path: None, + method, + service_name: None, + } + } + #[builder(visibility = "pub(crate)")] /// This is the constructor (or builder) to use when constructing a Subgraph /// `Externalizable`. diff --git a/docs/source/customizations/coprocessor.mdx b/docs/source/customizations/coprocessor.mdx index b01cf4e301..8fd14fe81f 100644 --- a/docs/source/customizations/coprocessor.mdx +++ b/docs/source/customizations/coprocessor.mdx @@ -48,7 +48,7 @@ flowchart TB; > This diagram shows request execution proceeding "down" from a client, through the router, to individual subgraphs. Execution then proceeds back "up" to the client in the reverse order. -As shown in the diagram above, only the `RouterService` and `SubgraphService` of the [request-handling lifecycle](./rhai/#router-request-lifecycle) can send these POST requests (also called **coprocessor requests**). +As shown in the diagram above, the `RouterService`, `SupergraphService` and `SubgraphService` steps of the [request-handling lifecycle](./rhai/#router-request-lifecycle) can send these POST requests (also called **coprocessor requests**). Each supported service can send its coprocessor requests at two different **stages**: @@ -92,6 +92,19 @@ coprocessor: context: false sdl: false status_code: false + supergraph: # This coprocessor hooks into the `SupergraphService` + request: # By including this key, the `SupergraphService` sends a coprocessor request whenever it first receives a client request. + headers: true # These boolean properties indicate which request data to include in the coprocessor request. All are optional and false by default. + body: false + context: false + sdl: false + method: false + response: # By including this key, the `SupergraphService` sends a coprocessor request whenever it's about to send response data to a client (including incremental data via @defer). + headers: true + body: false + context: false + sdl: false + status_code: false subgraph: all: request: # By including this key, the `SubgraphService` sends a coprocessor request whenever it is about to make a request to a subgraph.