diff --git a/CHANGELOG.md b/CHANGELOG.md index 7751be5d97..24b5b86bb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,27 +28,6 @@ Note that the name of this metric may change in the future. By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3513 -### Supergraph coprocessor implementation ([PR #3408](https://github.com/apollographql/router/pull/3408)) - -Coprocessors now support supergraph service interception. - -Supergraph request contains: -* query -* operation name -* variables -* extensions - -Supergraph reqponse contains: -* label -* data -* path -* errors -* extensions - -When using `@defer` or subscriptions a supergraph response may contain multiple GraphQL responses, and the coprocessor will be called for each. - -By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3408 - ### Configure AWS SigV4 authentication for subgraph requests ([PR #3365](https://github.com/apollographql/router/pull/3365)) Secure your router to subgraph communication on AWS using [Signature Version 4](https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html) (Sigv4)! @@ -108,7 +87,7 @@ By [@garypen](https://github.com/garypen) in https://github.com/apollographql/ro ### Spelling of `content_negociation` corrected to `content_negotiation` ([Issue #3204](https://github.com/apollographql/router/issues/3204)) -We had a bit of a French twist on one of our internal module names. We won't promise it won't happen again, but `content_negociation` is spelled as `content_negotiation` now. 😄 +We had a bit of a French twist on one of our internal module names. We won't promise it won't happen again, but `content_negociation` is spelled as `content_negotiation` now. 😄 Thank you for this contribution! @@ -1464,7 +1443,7 @@ By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/p ### Small gzip'd responses no longer cause a panic -A regression introduced in v1.17.0 — again related to compression — has been resolved. This occurred when small responses used invalid buffer management, causing a panic. +A regression introduced in v1.17.0 — again related to compression — has been resolved. This occurred when small responses used invalid buffer management, causing a panic. By [@dbanty](https://github.com/dbanty) in https://github.com/apollographql/router/pull/3047 @@ -1998,7 +1977,7 @@ By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographq ### Distributed caching: Don't send Redis' `CLIENT SETNAME` ([PR #2825](https://github.com/apollographql/router/pull/2825)) -We won't send [the `CLIENT SETNAME` command](https://redis.io/commands/client-setname/) to connected Redis servers. This resolves an incompatibility with some Redis-compatible servers since not all "Redis-compatible" offerings (like Google Memorystore) actually support _every_ Redis command. We weren't actually necessitating this feature, it was just a feature that could be enabled optionally on our Redis client. No Router functionality is impacted. +We won't send [the `CLIENT SETNAME` command](https://redis.io/commands/client-setname/) to connected Redis servers. This resolves an incompatibility with some Redis-compatible servers since not all "Redis-compatible" offerings (like Google Memorystore) actually support _every_ Redis command. We weren't actually necessitating this feature, it was just a feature that could be enabled optionally on our Redis client. No Router functionality is impacted. By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2825 @@ -3168,7 +3147,7 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p ### Optimize header propagation plugin's regular expression matching ([PR #2392](https://github.com/apollographql/router/pull/2392)) -We've changed the header propagation plugins' behavior to reduce the chance of memory allocations occurring when applying regex-based header propagation rules. +We've changed the header propagation plugins' behavior to reduce the chance of memory allocations occurring when applying regex-based header propagation rules. By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/2392 @@ -3218,7 +3197,7 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p Configuration changes will be [automatically migrated on load](https://www.apollographql.com/docs/router/configuration/overview#upgrading-your-router-configuration). However, you should update your source configuration files as these will become breaking changes in a future major release. -### Defer support graduates from preview ([Issue #2368](https://github.com/apollographql/router/issues/2368)) +### Defer support graduates from preview ([Issue #2368](https://github.com/apollographql/router/issues/2368)) We're pleased to announce that [`@defer` support](https://www.apollographql.com/docs/router/executing-operations/defer-support/) has been promoted to general availability in accordance with our [product launch stages](https://www.apollographql.com/docs/resources/product-launch-stages/). @@ -9157,4 +9136,4 @@ See our [release stages] for more information. But the lack of clarity goes back to not having kept track of everything thus far! We can _fix_ our processes to keep track of these things! :smile_cat: -# [0.1.0] - TBA +# [0.1.0] - TBA \ No newline at end of file diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 982588f27b..bf7d0901f1 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -770,113 +770,6 @@ expression: "&schema" }, "additionalProperties": false }, - "supergraph": { - "description": "The supergraph stage request/response configuration", - "default": { - "request": { - "headers": false, - "context": false, - "body": false, - "sdl": false, - "path": false, - "method": false - }, - "response": { - "headers": false, - "context": false, - "body": false, - "sdl": false, - "status_code": false - } - }, - "type": "object", - "properties": { - "request": { - "description": "The request configuration", - "default": { - "headers": false, - "context": false, - "body": false, - "sdl": false, - "path": false, - "method": false - }, - "type": "object", - "properties": { - "body": { - "description": "Send the body", - "default": false, - "type": "boolean" - }, - "context": { - "description": "Send the context", - "default": false, - "type": "boolean" - }, - "headers": { - "description": "Send the headers", - "default": false, - "type": "boolean" - }, - "method": { - "description": "Send the method", - "default": false, - "type": "boolean" - }, - "path": { - "description": "Send the path", - "default": false, - "type": "boolean" - }, - "sdl": { - "description": "Send the SDL", - "default": false, - "type": "boolean" - } - }, - "additionalProperties": false - }, - "response": { - "description": "What information is passed to a router request/response stage", - "default": { - "headers": false, - "context": false, - "body": false, - "sdl": false, - "status_code": false - }, - "type": "object", - "properties": { - "body": { - "description": "Send the body", - "default": false, - "type": "boolean" - }, - "context": { - "description": "Send the context", - "default": false, - "type": "boolean" - }, - "headers": { - "description": "Send the headers", - "default": false, - "type": "boolean" - }, - "sdl": { - "description": "Send the SDL", - "default": false, - "type": "boolean" - }, - "status_code": { - "description": "Send the HTTP status", - "default": false, - "type": "boolean" - } - }, - "additionalProperties": false - } - } - }, "timeout": { "description": "The timeout for external requests", "default": { diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor.rs similarity index 97% rename from apollo-router/src/plugins/coprocessor/mod.rs rename to apollo-router/src/plugins/coprocessor.rs index 356f42b0e2..fdad623cf8 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor.rs @@ -35,7 +35,6 @@ use crate::layers::ServiceBuilderExt; use crate::plugin::Plugin; use crate::plugin::PluginInit; use crate::register_plugin; -use crate::services; use crate::services::external::Control; use crate::services::external::Externalizable; use crate::services::external::PipelineStep; @@ -45,11 +44,6 @@ use crate::services::router; use crate::services::subgraph; use crate::tracer::TraceId; -#[cfg(test)] -mod test; - -mod supergraph; - pub(crate) const EXTERNAL_SPAN_NAME: &str = "external_plugin"; const POOL_IDLE_TIMEOUT_DURATION: Option = Some(Duration::from_secs(5)); @@ -92,13 +86,6 @@ impl Plugin for CoprocessorPlugin { self.router_service(service) } - fn supergraph_service( - &self, - service: services::supergraph::BoxService, - ) -> services::supergraph::BoxService { - self.supergraph_service(service) - } - fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService { self.subgraph_service(name, service) } @@ -162,18 +149,6 @@ where ) } - fn supergraph_service( - &self, - service: services::supergraph::BoxService, - ) -> services::supergraph::BoxService { - self.configuration.supergraph.as_service( - self.http_client.clone(), - service, - self.configuration.url.clone(), - self.sdl.clone(), - ) - } - fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService { self.configuration.subgraph.all.as_service( self.http_client.clone(), @@ -264,9 +239,6 @@ struct Conf { /// The router stage request/response configuration #[serde(default)] router: RouterStage, - /// The supergraph stage request/response configuration - #[serde(default)] - supergraph: supergraph::SupergraphStage, /// The subgraph stage request/response configuration #[serde(default)] subgraph: SubgraphStages, @@ -704,7 +676,7 @@ where // If first is None, or contains an error we return an error let opt_first: Option = first.and_then(|f| f.ok()); let bytes = match opt_first { - Some(b) => b, + Some(b) => b.to_vec(), None => { tracing::error!( "Coprocessor cannot convert body into future due to problem with first part" @@ -723,7 +695,7 @@ where .transpose()?; let body_to_send = response_config .body - .then(|| std::str::from_utf8(&bytes).map(|s| s.to_string())) + .then(|| String::from_utf8(bytes.clone())) .transpose()?; let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.then(|| response.context.clone()); @@ -885,6 +857,7 @@ where // First, extract the data we need from our request and prepare our // external call. Use our configuration to figure out which data to send. let (parts, body) = request.subgraph_request.into_parts(); + let bytes = Bytes::from(serde_json::to_vec(&body)?); let headers_to_send = request_config .headers @@ -893,7 +866,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::to_value(&body)) + .then(|| serde_json::from_slice::(&bytes)) .transpose()?; let context_to_send = request_config.context.then(|| request.context.clone()); let uri = request_config.uri.then(|| parts.uri.to_string()); @@ -1024,6 +997,7 @@ where // external call. Use our configuration to figure out which data to send. let (parts, body) = response.response.into_parts(); + let bytes = Bytes::from(serde_json::to_vec(&body)?); let headers_to_send = response_config .headers @@ -1034,7 +1008,7 @@ where let body_to_send = response_config .body - .then(|| serde_json::to_value(&body)) + .then(|| serde_json::from_slice::(&bytes)) .transpose()?; let context_to_send = response_config.context.then(|| response.context.clone()); let service_name = response_config.service_name.then_some(service_name); @@ -1124,7 +1098,7 @@ fn validate_coprocessor_output( } /// Convert a HeaderMap into a HashMap -pub(crate) fn externalize_header_map( +pub(super) fn externalize_header_map( input: &HeaderMap, ) -> Result>, BoxError> { let mut output = HashMap::new(); diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs deleted file mode 100644 index f6d0a06e18..0000000000 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ /dev/null @@ -1,881 +0,0 @@ -use std::ops::ControlFlow; -use std::sync::Arc; - -use futures::future; -use futures::stream; -use schemars::JsonSchema; -use serde::Deserialize; -use serde::Serialize; -use tower::BoxError; -use tower::ServiceBuilder; -use tower_service::Service; - -use super::externalize_header_map; -use super::*; -use crate::graphql; -use crate::layers::async_checkpoint::AsyncCheckpointLayer; -use crate::layers::ServiceBuilderExt; -use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; -use crate::response; -use crate::services::supergraph; - -/// What information is passed to a router request/response stage -#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] -#[serde(default, deny_unknown_fields)] -pub(super) struct SupergraphRequestConf { - /// Send the headers - pub(super) headers: bool, - /// Send the context - pub(super) context: bool, - /// Send the body - pub(super) body: bool, - /// Send the SDL - pub(super) sdl: bool, - /// Send the path - pub(super) path: bool, - /// Send the method - pub(super) method: bool, -} - -/// What information is passed to a router request/response stage -#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] -#[serde(default, deny_unknown_fields)] -pub(super) struct SupergraphResponseConf { - /// Send the headers - pub(super) headers: bool, - /// Send the context - pub(super) context: bool, - /// Send the body - pub(super) body: bool, - /// Send the SDL - pub(super) sdl: bool, - /// Send the HTTP status - pub(super) status_code: bool, -} - -#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize, JsonSchema)] -#[serde(default)] -pub(super) struct SupergraphStage { - /// The request configuration - pub(super) request: SupergraphRequestConf, - // /// The response configuration - pub(super) response: SupergraphResponseConf, -} - -impl SupergraphStage { - pub(crate) fn as_service( - &self, - http_client: C, - service: supergraph::BoxService, - coprocessor_url: String, - sdl: Arc, - ) -> supergraph::BoxService - where - C: Service, Response = hyper::Response, Error = BoxError> - + Clone - + Send - + Sync - + 'static, - >>::Future: Send + 'static, - { - let request_layer = (self.request != Default::default()).then_some({ - let request_config = self.request.clone(); - let coprocessor_url = coprocessor_url.clone(); - let http_client = http_client.clone(); - let sdl = sdl.clone(); - - AsyncCheckpointLayer::new(move |request: supergraph::Request| { - let request_config = request_config.clone(); - let coprocessor_url = coprocessor_url.clone(); - let http_client = http_client.clone(); - let sdl = sdl.clone(); - - async move { - process_supergraph_request_stage( - http_client, - coprocessor_url, - sdl, - request, - request_config, - ) - .await - .map_err(|error| { - tracing::error!( - "external extensibility: supergraph request stage error: {error}" - ); - error - }) - } - }) - }); - - let response_layer = (self.response != Default::default()).then_some({ - let response_config = self.response.clone(); - - MapFutureLayer::new(move |fut| { - let coprocessor_url = coprocessor_url.clone(); - let sdl: Arc = sdl.clone(); - let http_client = http_client.clone(); - let response_config = response_config.clone(); - - async move { - let response: supergraph::Response = fut.await?; - - process_supergraph_response_stage( - http_client, - coprocessor_url, - sdl, - response, - response_config, - ) - .await - .map_err(|error| { - tracing::error!( - "external extensibility: router response stage error: {error}" - ); - error - }) - } - }) - }); - - fn external_service_span() -> impl Fn(&supergraph::Request) -> tracing::Span + Clone { - move |_request: &supergraph::Request| { - tracing::info_span!( - EXTERNAL_SPAN_NAME, - "external service" = stringify!(supergraph::Request), - "otel.kind" = "INTERNAL" - ) - } - } - - ServiceBuilder::new() - .instrument(external_service_span()) - .option_layer(request_layer) - .option_layer(response_layer) - .buffered() - .service(service) - .boxed() - } -} - -async fn process_supergraph_request_stage( - http_client: C, - coprocessor_url: String, - sdl: Arc, - mut request: supergraph::Request, - request_config: SupergraphRequestConf, -) -> Result, BoxError> -where - C: Service, Response = hyper::Response, Error = BoxError> - + Clone - + Send - + Sync - + 'static, - >>::Future: Send + 'static, -{ - // Call into our out of process processor with a body of our body - // First, extract the data we need from our request and prepare our - // external call. Use our configuration to figure out which data to send. - let (parts, body) = request.supergraph_request.into_parts(); - let bytes = Bytes::from(serde_json::to_vec(&body)?); - - let headers_to_send = request_config - .headers - .then(|| externalize_header_map(&parts.headers)) - .transpose()?; - - let body_to_send = request_config - .body - .then(|| serde_json::from_slice::(&bytes)) - .transpose()?; - let context_to_send = request_config.context.then(|| request.context.clone()); - - let payload = Externalizable::supergraph_builder() - .stage(PipelineStep::SupergraphRequest) - .control(Control::default()) - .and_id(TraceId::maybe_new().map(|id| id.to_string())) - .and_headers(headers_to_send) - .and_body(body_to_send) - .and_context(context_to_send) - .method(parts.method.to_string()) - .sdl(sdl.to_string()) - .build(); - - tracing::debug!(?payload, "externalized output"); - let guard = request.context.enter_active_request(); - let start = Instant::now(); - let co_processor_result = payload.call(http_client, &coprocessor_url).await; - let duration = start.elapsed().as_secs_f64(); - drop(guard); - tracing::info!( - histogram.apollo.router.operations.coprocessor.duration = duration, - coprocessor.stage = %PipelineStep::SupergraphRequest, - ); - - tracing::debug!(?co_processor_result, "co-processor returned"); - let co_processor_output = co_processor_result?; - validate_coprocessor_output(&co_processor_output, PipelineStep::SupergraphRequest)?; - // unwrap is safe here because validate_coprocessor_output made sure control is available - let control = co_processor_output.control.expect("validated above; qed"); - - // Thirdly, we need to interpret the control flow which may have been - // updated by our co-processor and decide if we should proceed or stop. - - if matches!(control, Control::Break(_)) { - // Ensure the code is a valid http status code - let code = control.get_http_status()?; - - let res = { - let graphql_response: crate::graphql::Response = - serde_json::from_value(co_processor_output.body.unwrap_or(serde_json::Value::Null)) - .unwrap_or_else(|error| { - crate::graphql::Response::builder() - .errors(vec![Error::builder() - .message(format!( - "couldn't deserialize coprocessor output body: {error}" - )) - .extension_code("EXTERNAL_DESERIALIZATION_ERROR") - .build()]) - .build() - }); - - let mut http_response = http::Response::builder() - .status(code) - .body(stream::once(future::ready(graphql_response)).boxed())?; - if let Some(headers) = co_processor_output.headers { - *http_response.headers_mut() = internalize_header_map(headers)?; - } - - let supergraph_response = supergraph::Response { - response: http_response, - context: request.context, - }; - - if let Some(context) = co_processor_output.context { - for (key, value) in context.try_into_iter()? { - supergraph_response - .context - .upsert_json_value(key, move |_current| value); - } - } - - supergraph_response - }; - return Ok(ControlFlow::Break(res)); - } - - // Finally, process our reply and act on the contents. Our processing logic is - // that we replace "bits" of our incoming request with the updated bits if they - // are present in our co_processor_output. - - let new_body: crate::graphql::Request = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, - None => body, - }; - - request.supergraph_request = http::Request::from_parts(parts, new_body); - - if let Some(context) = co_processor_output.context { - for (key, value) in context.try_into_iter()? { - request - .context - .upsert_json_value(key, move |_current| value); - } - } - - if let Some(headers) = co_processor_output.headers { - *request.supergraph_request.headers_mut() = internalize_header_map(headers)?; - } - - if let Some(uri) = co_processor_output.uri { - *request.supergraph_request.uri_mut() = uri.parse()?; - } - - Ok(ControlFlow::Continue(request)) -} - -async fn process_supergraph_response_stage( - http_client: C, - coprocessor_url: String, - sdl: Arc, - response: supergraph::Response, - response_config: SupergraphResponseConf, -) -> Result -where - C: Service, Response = hyper::Response, Error = BoxError> - + Clone - + Send - + Sync - + 'static, - >>::Future: Send + 'static, -{ - // split the response into parts + body - let (mut parts, body) = response.response.into_parts(); - - // we split the body (which is a stream) into first response + rest of responses, - // for which we will implement mapping later - let (first, rest): (Option, graphql::ResponseStream) = - body.into_future().await; - - // If first is None, we return an error - let first = first.ok_or_else(|| { - BoxError::from("Coprocessor cannot convert body into future due to problem with first part") - })?; - - // Now we process our first chunk of response - // Encode headers, body, status, context, sdl to create a payload - let headers_to_send = response_config - .headers - .then(|| externalize_header_map(&parts.headers)) - .transpose()?; - let body_to_send = response_config - .body - .then(|| serde_json::to_value(&first).expect("serialization will not fail")); - let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); - let context_to_send = response_config.context.then(|| response.context.clone()); - let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); - - let payload = Externalizable::supergraph_builder() - .stage(PipelineStep::SupergraphResponse) - .and_id(TraceId::maybe_new().map(|id| id.to_string())) - .and_headers(headers_to_send) - .and_body(body_to_send) - .and_context(context_to_send) - .and_status_code(status_to_send) - .and_sdl(sdl_to_send.clone()) - .build(); - - // Second, call our co-processor and get a reply. - tracing::debug!(?payload, "externalized output"); - let guard = response.context.enter_active_request(); - let start = Instant::now(); - let co_processor_result = payload.call(http_client.clone(), &coprocessor_url).await; - let duration = start.elapsed().as_secs_f64(); - drop(guard); - tracing::info!( - histogram.apollo.router.operations.coprocessor.duration = duration, - coprocessor.stage = %PipelineStep::SupergraphResponse, - ); - - tracing::debug!(?co_processor_result, "co-processor returned"); - let co_processor_output = co_processor_result?; - - validate_coprocessor_output(&co_processor_output, PipelineStep::SupergraphResponse)?; - - // Third, process our reply and act on the contents. Our processing logic is - // that we replace "bits" of our incoming response with the updated bits if they - // are present in our co_processor_output. If they aren't present, just use the - // bits that we sent to the co_processor. - let new_body: crate::response::Response = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, - None => first, - }; - - if let Some(control) = co_processor_output.control { - parts.status = control.get_http_status()? - } - - if let Some(context) = co_processor_output.context { - for (key, value) in context.try_into_iter()? { - response - .context - .upsert_json_value(key, move |_current| value); - } - } - - if let Some(headers) = co_processor_output.headers { - parts.headers = internalize_header_map(headers)?; - } - - // Clone all the bits we need - let context = response.context.clone(); - let map_context = response.context.clone(); - - // Map the rest of our body to process subsequent chunks of response - let mapped_stream = rest - .then(move |deferred_response| { - let generator_client = http_client.clone(); - let generator_coprocessor_url = coprocessor_url.clone(); - let generator_map_context = map_context.clone(); - let generator_sdl_to_send = sdl_to_send.clone(); - - async move { - let body_to_send = response_config.body.then(|| { - serde_json::to_value(&deferred_response).expect("serialization will not fail") - }); - let context_to_send = response_config - .context - .then(|| generator_map_context.clone()); - - // Note: We deliberately DO NOT send headers or status_code even if the user has - // requested them. That's because they are meaningless on a deferred response and - // providing them will be a source of confusion. - let payload = Externalizable::router_builder() - .stage(PipelineStep::SupergraphResponse) - .and_id(TraceId::maybe_new().map(|id| id.to_string())) - .and_body(body_to_send) - .and_context(context_to_send) - .and_sdl(generator_sdl_to_send) - .build(); - - // Second, call our co-processor and get a reply. - tracing::debug!(?payload, "externalized output"); - let guard = generator_map_context.enter_active_request(); - let co_processor_result = payload - .call(generator_client, &generator_coprocessor_url) - .await; - drop(guard); - tracing::debug!(?co_processor_result, "co-processor returned"); - let co_processor_output = co_processor_result?; - - validate_coprocessor_output( - &co_processor_output, - PipelineStep::SupergraphResponse, - )?; - - // Third, process our reply and act on the contents. Our processing logic is - // that we replace "bits" of our incoming response with the updated bits if they - // are present in our co_processor_output. If they aren't present, just use the - // bits that we sent to the co_processor. - let new_deferred_response: crate::response::Response = - match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, - None => deferred_response, - }; - - if let Some(context) = co_processor_output.context { - for (key, value) in context.try_into_iter()? { - generator_map_context.upsert_json_value(key, move |_current| value); - } - } - - // We return the deferred_response into our stream of response chunks - Ok(new_deferred_response) - } - }) - .map(|res: Result| match res { - Ok(response) => response, - Err(e) => { - tracing::error!("coprocessor error handling deferred supergraph response: {e}"); - response::Response::builder() - .error( - Error::builder() - .message("Internal error handling deferred response") - .extension_code("INTERNAL_ERROR") - .build(), - ) - .build() - } - }); - - // Create our response stream which consists of our first body chained with the - // rest of the responses in our mapped stream. - let stream = once(ready(new_body)).chain(mapped_stream).boxed(); - - // Finally, return a response which has a Body that wraps our stream of response chunks. - Ok(supergraph::Response { - context, - response: http::Response::from_parts(parts, stream), - }) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use futures::future::BoxFuture; - use http::StatusCode; - use hyper::Body; - use serde_json::json; - use tower::BoxError; - use tower::ServiceExt; - - use super::super::*; - use super::*; - use crate::plugin::test::MockHttpClientService; - use crate::plugin::test::MockSupergraphService; - use crate::services::supergraph; - - #[allow(clippy::type_complexity)] - pub(crate) fn mock_with_callback( - callback: fn( - hyper::Request, - ) -> BoxFuture<'static, Result, BoxError>>, - ) -> MockHttpClientService { - let mut mock_http_client = MockHttpClientService::new(); - mock_http_client.expect_clone().returning(move || { - let mut mock_http_client = MockHttpClientService::new(); - - mock_http_client.expect_clone().returning(move || { - let mut mock_http_client = MockHttpClientService::new(); - mock_http_client.expect_call().returning(callback); - mock_http_client - }); - mock_http_client - }); - - mock_http_client - } - - #[allow(clippy::type_complexity)] - fn mock_with_deferred_callback( - callback: fn( - hyper::Request, - ) -> BoxFuture<'static, Result, BoxError>>, - ) -> MockHttpClientService { - let mut mock_http_client = MockHttpClientService::new(); - mock_http_client.expect_clone().returning(move || { - let mut mock_http_client = MockHttpClientService::new(); - mock_http_client.expect_clone().returning(move || { - let mut mock_http_client = MockHttpClientService::new(); - mock_http_client.expect_clone().returning(move || { - let mut mock_http_client = MockHttpClientService::new(); - mock_http_client.expect_call().returning(callback); - mock_http_client - }); - mock_http_client - }); - mock_http_client - }); - - mock_http_client - } - - #[tokio::test] - async fn external_plugin_supergraph_request() { - let supergraph_stage = SupergraphStage { - request: SupergraphRequestConf { - headers: false, - context: false, - body: true, - sdl: false, - method: false, - path: false, - }, - response: Default::default(), - }; - - // This will never be called because we will fail at the coprocessor. - let mut mock_supergraph_service = MockSupergraphService::new(); - - mock_supergraph_service - .expect_call() - .returning(|req: supergraph::Request| { - // Let's assert that the subgraph request has been transformed as it should have. - assert_eq!( - req.supergraph_request.headers().get("cookie").unwrap(), - "tasty_cookie=strawberry" - ); - - assert_eq!( - req.context - .get::<&str, u8>("this-is-a-test-context") - .unwrap() - .unwrap(), - 42 - ); - - // The subgraph uri should have changed - assert_eq!( - Some("MyQuery"), - req.supergraph_request.body().operation_name.as_deref() - ); - - // The query should have changed - assert_eq!( - "query Long {\n me {\n name\n}\n}", - req.supergraph_request.body().query.as_ref().unwrap() - ); - - Ok(supergraph::Response::builder() - .data(json!({ "test": 1234_u32 })) - .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) - .context(req.context) - .build() - .unwrap()) - }); - - let mock_http_client = mock_with_callback(move |_: hyper::Request| { - Box::pin(async { - Ok(hyper::Response::builder() - .body(Body::from( - r##"{ - "version": 1, - "stage": "SupergraphRequest", - "control": "continue", - "headers": { - "cookie": [ - "tasty_cookie=strawberry" - ], - "content-type": [ - "application/json" - ], - "host": [ - "127.0.0.1:4000" - ], - "apollo-federation-include-trace": [ - "ftv1" - ], - "apollographql-client-name": [ - "manual" - ], - "accept": [ - "*/*" - ], - "user-agent": [ - "curl/7.79.1" - ], - "content-length": [ - "46" - ] - }, - "body": { - "query": "query Long {\n me {\n name\n}\n}", - "operationName": "MyQuery" - }, - "context": { - "entries": { - "accepts-json": false, - "accepts-wildcard": true, - "accepts-multipart": false, - "this-is-a-test-context": 42 - } - }, - "serviceName": "service name shouldn't change", - "uri": "http://thisurihaschanged" - }"##, - )) - .unwrap()) - }) - }); - - let service = supergraph_stage.as_service( - mock_http_client, - mock_supergraph_service.boxed(), - "http://test".to_string(), - Arc::new("".to_string()), - ); - - let request = supergraph::Request::fake_builder().build().unwrap(); - - assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), - service - .oneshot(request) - .await - .unwrap() - .response - .into_body() - .next() - .await - .unwrap() - .data - .unwrap() - ); - } - - #[tokio::test] - async fn external_plugin_supergraph_request_controlflow_break() { - let supergraph_stage = SupergraphStage { - request: SupergraphRequestConf { - headers: false, - context: false, - body: true, - sdl: false, - method: false, - path: false, - }, - response: Default::default(), - }; - - // This will never be called because we will fail at the coprocessor. - let mock_supergraph_service = MockSupergraphService::new(); - - let mock_http_client = mock_with_callback(move |_: hyper::Request| { - Box::pin(async { - Ok(hyper::Response::builder() - .body(Body::from( - r##"{ - "version": 1, - "stage": "SupergraphRequest", - "control": { - "break": 200 - }, - "body": { - "errors": [{ "message": "my error message" }] - }, - "context": { - "entries": { - "testKey": true - } - }, - "headers": { - "aheader": ["a value"] - } - }"##, - )) - .unwrap()) - }) - }); - - let service = supergraph_stage.as_service( - mock_http_client, - mock_supergraph_service.boxed(), - "http://test".to_string(), - Arc::new("".to_string()), - ); - - let request = supergraph::Request::fake_builder().build().unwrap(); - - let crate::services::supergraph::Response { - mut response, - context, - } = service.oneshot(request).await.unwrap(); - - assert!(context.get::<_, bool>("testKey").unwrap().unwrap()); - - let value = response.headers().get("aheader").unwrap(); - - assert_eq!("a value", value); - - assert_eq!( - "my error message", - response.body_mut().next().await.unwrap().errors[0] - .message - .as_str() - ); - } - - #[tokio::test] - async fn external_plugin_supergraph_response() { - let supergraph_stage = SupergraphStage { - response: SupergraphResponseConf { - headers: true, - context: true, - body: true, - sdl: true, - status_code: false, - }, - request: Default::default(), - }; - - let mut mock_supergraph_service = MockSupergraphService::new(); - - mock_supergraph_service - .expect_call() - .returning(|req: supergraph::Request| { - Ok(supergraph::Response::builder() - .data(json!({ "test": 1234_u32 })) - .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) - .context(req.context) - .build() - .unwrap()) - }); - - let mock_http_client = mock_with_deferred_callback(move |res: hyper::Request| { - Box::pin(async { - let deserialized_response: Externalizable = - serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await.unwrap()) - .unwrap(); - - assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); - assert_eq!( - PipelineStep::SupergraphResponse.to_string(), - deserialized_response.stage - ); - - assert_eq!( - json! {{"data":{ "test": 1234_u32 }}}, - deserialized_response.body.unwrap() - ); - - let input = json!( - { - "version": 1, - "stage": "SupergraphResponse", - "control": { - "break": 400 - }, - "id": "1b19c05fdafc521016df33148ad63c1b", - "headers": { - "cookie": [ - "tasty_cookie=strawberry" - ], - "content-type": [ - "application/json" - ], - "host": [ - "127.0.0.1:4000" - ], - "apollo-federation-include-trace": [ - "ftv1" - ], - "apollographql-client-name": [ - "manual" - ], - "accept": [ - "*/*" - ], - "user-agent": [ - "curl/7.79.1" - ], - "content-length": [ - "46" - ] - }, - "body": { - "data": { "test": 42 } - }, - "context": { - "entries": { - "accepts-json": false, - "accepts-wildcard": true, - "accepts-multipart": false, - "this-is-a-test-context": 42 - } - }, - "sdl": "the sdl shouldnt change" - }); - Ok(hyper::Response::builder() - .body(Body::from(serde_json::to_string(&input).unwrap())) - .unwrap()) - }) - }); - - let service = supergraph_stage.as_service( - mock_http_client, - mock_supergraph_service.boxed(), - "http://test".to_string(), - Arc::new("".to_string()), - ); - - let request = supergraph::Request::canned_builder().build().unwrap(); - - let mut res = service.oneshot(request).await.unwrap(); - - // Let's assert that the router request has been transformed as it should have. - assert_eq!(res.response.status(), StatusCode::BAD_REQUEST); - assert_eq!( - res.response.headers().get("cookie").unwrap(), - "tasty_cookie=strawberry" - ); - - assert_eq!( - res.context - .get::<&str, u8>("this-is-a-test-context") - .unwrap() - .unwrap(), - 42 - ); - - let body = res.response.body_mut().next().await.unwrap(); - // the body should have changed: - assert_eq!( - json!({ "data": { "test": 42_u32 } }), - serde_json::to_value(&body).unwrap(), - ); - } -} diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor_test.rs similarity index 99% rename from apollo-router/src/plugins/coprocessor/test.rs rename to apollo-router/src/plugins/coprocessor_test.rs index 05ee1650d7..297391af5e 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor_test.rs @@ -17,7 +17,7 @@ mod tests { use tower::BoxError; use tower::ServiceExt; - use super::super::*; + use super::super::coprocessor::*; use crate::plugin::test::MockHttpClientService; use crate::plugin::test::MockRouterService; use crate::plugin::test::MockSubgraphService; diff --git a/apollo-router/src/plugins/mod.rs b/apollo-router/src/plugins/mod.rs index c3c4ccc436..2bd2270df1 100644 --- a/apollo-router/src/plugins/mod.rs +++ b/apollo-router/src/plugins/mod.rs @@ -23,6 +23,8 @@ macro_rules! schemar_fn { pub(crate) mod authentication; mod authorization; mod coprocessor; +#[cfg(test)] +mod coprocessor_test; pub(crate) mod csrf; mod expose_query_plan; mod forbid_mutations; diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs index 2161ac053d..440f399d1d 100644 --- a/apollo-router/src/services/external.rs +++ b/apollo-router/src/services/external.rs @@ -131,42 +131,6 @@ where } } - #[builder(visibility = "pub(crate)")] - /// This is the constructor (or builder) to use when constructing a Supergraph - /// `Externalizable`. - /// - fn supergraph_new( - stage: PipelineStep, - control: Option, - id: Option, - headers: Option>>, - body: Option, - context: Option, - status_code: Option, - method: Option, - sdl: Option, - ) -> Self { - assert!(matches!( - stage, - PipelineStep::SupergraphRequest | PipelineStep::SupergraphResponse - )); - Externalizable { - version: EXTERNALIZABLE_VERSION, - stage: stage.to_string(), - control, - id, - headers, - body, - context, - status_code, - sdl, - uri: None, - path: None, - method, - service_name: None, - } - } - #[builder(visibility = "pub(crate)")] /// This is the constructor (or builder) to use when constructing a Subgraph /// `Externalizable`. diff --git a/docs/source/customizations/coprocessor.mdx b/docs/source/customizations/coprocessor.mdx index b09bae0974..5842baf4c5 100644 --- a/docs/source/customizations/coprocessor.mdx +++ b/docs/source/customizations/coprocessor.mdx @@ -37,18 +37,17 @@ flowchart TB; client --"1. Sends request"--> routerService; routerService -."2. Can send request
details to coprocessor
and receive modifications".-> coprocessing; routerService --"3"--> supergraphService; - supergraphService --"4. Can send request
details to coprocessor
and receive modifications".-> coprocessing; - supergraphService --"5"--> executionService; - executionService --"6"--> subgraphService; - subgraphService -."7. Can send request
details to coprocessor
and receive modifications".-> coprocessing; - subgraphService -- "8"--> subgraphs; + supergraphService --"4"--> executionService; + executionService --"5"--> subgraphService; + subgraphService -."6. Can send request
details to coprocessor
and receive modifications".-> coprocessing; + subgraphService -- "7"--> subgraphs; class client,subgraphs,coprocessing secondary; ``` > This diagram shows request execution proceeding "down" from a client, through the router, to individual subgraphs. Execution then proceeds back "up" to the client in the reverse order. -As shown in the diagram above, the `RouterService`, `SupergraphService` and `SubgraphService` steps of the [request-handling lifecycle](./rhai/#router-request-lifecycle) can send these POST requests (also called **coprocessor requests**). +As shown in the diagram above, only the `RouterService` and `SubgraphService` of the [request-handling lifecycle](./rhai/#router-request-lifecycle) can send these POST requests (also called **coprocessor requests**). Each supported service can send its coprocessor requests at two different **stages**: @@ -92,20 +91,6 @@ coprocessor: context: false sdl: false status_code: false - supergraph: # This coprocessor hooks into the `SupergraphService` - request: # By including this key, the `SupergraphService` sends a coprocessor request whenever it first receives a client request. - headers: true # These boolean properties indicate which request data to include in the coprocessor request. All are optional and false by default. - body: false - context: false - sdl: false - path: false - method: false - response: # By including this key, the `SupergraphService` sends a coprocessor request whenever it's about to send response data to a client (including incremental data via @defer). - headers: true - body: false - context: false - sdl: false - status_code: false subgraph: all: request: # By including this key, the `SubgraphService` sends a coprocessor request whenever it is about to make a request to a subgraph.