From aeb5ffe5c9cbd305576c8a7a6dfc0ac06009e5ef Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 27 Sep 2023 13:31:54 +0100 Subject: [PATCH] experimental support for query batching (#3837) Things needing attention - [x] Configuration to match the design - [x] Decide if cloning `parts` is acceptable (in review) It isn't, so not cloned but re-built. - [x] Decide if cloning context for each `SupergraphRequest` is acceptable (in review) - [x] Should we do special handling if `@defer` or `subscription` detected in batch? - [x] Metrics - [x] Find someone happy to test from an appropriately configured Apollo Client and verify it works - [x] Modify Apollo telemetry to create separate root traces for each batch entry Fixes #126 --- **Checklist** Complete the checklist (and note appropriate exceptions) before the PR is marked ready-for-review. - [x] Changes are compatible[^1] - [x] Documentation[^2] completed - [x] Performance impact assessed and acceptable - Tests added and passing[^3] - [x] Unit Tests - [x] Integration Tests - [x] Manual Tests **Exceptions** Manual testing was performed with `curl` and [apollo client](https://www.apollographql.com/docs/react/api/link/apollo-link-batch-http/) **Notes** [^1]: It may be appropriate to bring upcoming changes to the attention of other (impacted) groups. Please endeavour to do this before seeking PR approval. The mechanism for doing this will vary considerably, so use your judgement as to how and when to do this. [^2]: Configuration is an important part of many changes. Where applicable please try to document configuration examples. [^3]: Tick whichever testing boxes are applicable. If you are adding Manual Tests, please document the manual testing (extensively) in the Exceptions. --------- Co-authored-by: Edward Huang Co-authored-by: Geoffroy Couprie Co-authored-by: Maria Elisabeth Schreiber --- .changesets/exp_garypen_126_query_batching.md | 21 + apollo-router/feature_discussions.json | 3 +- apollo-router/src/configuration/metrics.rs | 6 + apollo-router/src/configuration/mod.rs | 30 + ...s__test__metrics@batching.router.yaml.snap | 8 + ...nfiguration__tests__schema_generation.snap | 31 + .../testdata/metrics/batching.router.yaml | 3 + .../telemetry/tracing/apollo_telemetry.rs | 81 +- apollo-router/src/request.rs | 116 +- .../badly_formatted_batch_response.json | 11 + .../batching_not_enabled_response.json | 11 + .../testdata/expected_good_response.json | 98 ++ apollo-router/src/services/router_service.rs | 579 +++++++-- .../src/services/supergraph_service.rs | 24 + apollo-router/tests/apollo_reports.rs | 469 ++++--- .../fixtures/apollo_reports.batch_router.yaml | 28 + .../apollo_reports__batch_send_header.snap | 1095 +++++++++++++++++ .../apollo_reports__batch_trace_id.snap | 1089 ++++++++++++++++ ...ecycle_tests__cli_config_experimental.snap | 1 + docs/source/config.json | 6 + docs/source/configuration/metrics.mdx | 5 + docs/source/configuration/overview.mdx | 4 + docs/source/configuration/traffic-shaping.mdx | 12 + .../executing-operations/query-batching.mdx | 210 ++++ 24 files changed, 3649 insertions(+), 292 deletions(-) create mode 100644 .changesets/exp_garypen_126_query_batching.md create mode 100644 apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@batching.router.yaml.snap create mode 100644 apollo-router/src/configuration/testdata/metrics/batching.router.yaml create mode 100644 apollo-router/src/services/query_batching/testdata/badly_formatted_batch_response.json create mode 100644 apollo-router/src/services/query_batching/testdata/batching_not_enabled_response.json create mode 100644 apollo-router/src/services/query_batching/testdata/expected_good_response.json create mode 100644 apollo-router/tests/fixtures/apollo_reports.batch_router.yaml create mode 100644 apollo-router/tests/snapshots/apollo_reports__batch_send_header.snap create mode 100644 apollo-router/tests/snapshots/apollo_reports__batch_trace_id.snap create mode 100644 docs/source/executing-operations/query-batching.mdx diff --git a/.changesets/exp_garypen_126_query_batching.md b/.changesets/exp_garypen_126_query_batching.md new file mode 100644 index 0000000000..7cf1f1ce51 --- /dev/null +++ b/.changesets/exp_garypen_126_query_batching.md @@ -0,0 +1,21 @@ +### query batching prototype ([Issue #126](https://github.com/apollographql/router/issues/126)) + +An experimental implementation of query batching which adds support for client request batching to the Apollo Router. + +If you’re using Apollo Client, you can leverage the in-built support for batching to reduce the number of individual requests sent to the Apollo Router. + +Once [configured](https://www.apollographql.com/docs/react/api/link/apollo-link-batch-http/), Apollo Client will automatically combine multiple operations into a single HTTP request. The number of operations within a batch is client configurable, including the maximum number of operations in a batch and the maximum duration to wait for operations to accumulate before sending the batch request. + +The Apollo Router must be configured to receive batch requests, otherwise it rejects them. When processing a batch request, the router deserializes and processes each operation of a batch independently, and it responds to the client only after all operations of the batch have been completed. + +```yaml +experimental_batching: + enabled: true + mode: batch_http_link +``` + +All operations within a batch will execute concurrently with respect to each other. + +Do not attempt to use subscriptions or `@defer` queries within a batch as they are not supported. + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/3837 diff --git a/apollo-router/feature_discussions.json b/apollo-router/feature_discussions.json index a972a3a506..2012721a78 100644 --- a/apollo-router/feature_discussions.json +++ b/apollo-router/feature_discussions.json @@ -3,7 +3,8 @@ "experimental_retry": "https://github.com/apollographql/router/discussions/2241", "experimental_response_trace_id": "https://github.com/apollographql/router/discussions/2147", "experimental_logging": "https://github.com/apollographql/router/discussions/1961", - "experimental_http_max_request_bytes": "https://github.com/apollographql/router/discussions/3220" + "experimental_http_max_request_bytes": "https://github.com/apollographql/router/discussions/3220", + "experimental_batching": "https://github.com/apollographql/router/discussions/3840" }, "preview": { "preview_directives": "https://github.com/apollographql/router/discussions/3754" diff --git a/apollo-router/src/configuration/metrics.rs b/apollo-router/src/configuration/metrics.rs index 9506b1e1f2..014922f912 100644 --- a/apollo-router/src/configuration/metrics.rs +++ b/apollo-router/src/configuration/metrics.rs @@ -313,6 +313,12 @@ impl Metrics { opt.tracing.zipkin, "$.tracing.zipkin[?(@.endpoint)]" ); + log_usage_metrics!( + value.apollo.router.config.batching, + "$.experimental_batching[?(@.enabled == true)]", + opt.mode, + "$.mode" + ); } } diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index 02c1694051..27b5dcc8e4 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -185,6 +185,10 @@ pub struct Configuration { #[serde(default, skip_serializing, skip_deserializing)] pub(crate) notify: Notify, + + /// Batching configuration. + #[serde(default)] + pub(crate) experimental_batching: Batching, } impl PartialEq for Configuration { @@ -234,6 +238,7 @@ impl<'de> serde::Deserialize<'de> for Configuration { limits: Limits, experimental_chaos: Chaos, experimental_graphql_validation_mode: GraphQLValidationMode, + experimental_batching: Batching, } let ad_hoc: AdHocConfiguration = serde::Deserialize::deserialize(deserializer)?; @@ -252,6 +257,7 @@ impl<'de> serde::Deserialize<'de> for Configuration { .chaos(ad_hoc.experimental_chaos) .uplink(ad_hoc.uplink) .graphql_validation_mode(ad_hoc.experimental_graphql_validation_mode) + .experimental_batching(ad_hoc.experimental_batching) .build() .map_err(|e| serde::de::Error::custom(e.to_string())) } @@ -288,6 +294,7 @@ impl Configuration { chaos: Option, uplink: Option, graphql_validation_mode: Option, + experimental_batching: Option, ) -> Result { #[cfg(not(test))] let notify_queue_cap = match apollo_plugins.get(APOLLO_SUBSCRIPTION_PLUGIN_NAME) { @@ -322,6 +329,7 @@ impl Configuration { }, tls: tls.unwrap_or_default(), uplink, + experimental_batching: experimental_batching.unwrap_or_default(), #[cfg(test)] notify: notify.unwrap_or_default(), #[cfg(not(test))] @@ -360,6 +368,7 @@ impl Configuration { chaos: Option, uplink: Option, graphql_validation_mode: Option, + experimental_batching: Option, ) -> Result { let configuration = Self { validated_yaml: Default::default(), @@ -382,6 +391,7 @@ impl Configuration { apq: apq.unwrap_or_default(), preview_persisted_queries: persisted_query.unwrap_or_default(), uplink, + experimental_batching: experimental_batching.unwrap_or_default(), }; configuration.validate() @@ -1273,3 +1283,23 @@ fn default_graphql_path() -> String { fn default_graphql_introspection() -> bool { false } + +#[derive(Clone, Debug, Default, Error, Display, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields, rename_all = "snake_case")] +pub(crate) enum BatchingMode { + /// batch_http_link + #[default] + BatchHttpLink, +} + +/// Configuration for Batching +#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub(crate) struct Batching { + /// Activates Batching (disabled by default) + #[serde(default)] + pub(crate) enabled: bool, + + /// Batching mode + pub(crate) mode: BatchingMode, +} diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@batching.router.yaml.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@batching.router.yaml.snap new file mode 100644 index 0000000000..4d5d0a7090 --- /dev/null +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__metrics__test__metrics@batching.router.yaml.snap @@ -0,0 +1,8 @@ +--- +source: apollo-router/src/configuration/metrics.rs +expression: "&metrics.metrics" +--- +value.apollo.router.config.batching: + - 1 + - opt__mode__: batch_http_link + diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 20fba9012f..68343701f6 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -1009,6 +1009,37 @@ expression: "&schema" }, "additionalProperties": false }, + "experimental_batching": { + "description": "Batching configuration.", + "default": { + "enabled": false, + "mode": "batch_http_link" + }, + "type": "object", + "required": [ + "mode" + ], + "properties": { + "enabled": { + "description": "Activates Batching (disabled by default)", + "default": false, + "type": "boolean" + }, + "mode": { + "description": "Batching mode", + "oneOf": [ + { + "description": "batch_http_link", + "type": "string", + "enum": [ + "batch_http_link" + ] + } + ] + } + }, + "additionalProperties": false + }, "experimental_chaos": { "description": "Configuration for chaos testing, trying to reproduce bugs that require uncommon conditions. You probably don’t want this in production!", "default": { diff --git a/apollo-router/src/configuration/testdata/metrics/batching.router.yaml b/apollo-router/src/configuration/testdata/metrics/batching.router.yaml new file mode 100644 index 0000000000..c177d3f45e --- /dev/null +++ b/apollo-router/src/configuration/testdata/metrics/batching.router.yaml @@ -0,0 +1,3 @@ +experimental_batching: + enabled: true + mode: batch_http_link diff --git a/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs b/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs index 8bf8ab044e..7bf2068416 100644 --- a/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs +++ b/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs @@ -209,11 +209,12 @@ impl Exporter { }) } - fn extract_root_trace( + fn extract_root_traces( &mut self, span: &LightSpanData, child_nodes: Vec, - ) -> Result, Error> { + ) -> Result, Error> { + let mut results: Vec = vec![]; let http = extract_http_data(span); let mut root_trace = proto::reports::Trace { start_time: Some(span.start_time.into()), @@ -236,17 +237,19 @@ impl Exporter { client_version, duration_ns, } => { - if http.method != Method::Unknown as i32 { - let root_http = root_trace - .http - .as_mut() - .expect("http was extracted earlier, qed"); - root_http.request_headers = http.request_headers; - root_http.response_headers = http.response_headers; + for trace in results.iter_mut() { + if http.method != Method::Unknown as i32 { + let root_http = trace + .http + .as_mut() + .expect("http was extracted earlier, qed"); + root_http.request_headers = http.request_headers.clone(); + root_http.response_headers = http.response_headers.clone(); + } + trace.client_name = client_name.clone().unwrap_or_default(); + trace.client_version = client_version.clone().unwrap_or_default(); + trace.duration_ns = duration_ns; } - root_trace.client_name = client_name.unwrap_or_default(); - root_trace.client_version = client_version.unwrap_or_default(); - root_trace.duration_ns = duration_ns; } TreeData::Supergraph { operation_signature, @@ -259,6 +262,7 @@ impl Exporter { variables_json, operation_name, }); + results.push(root_trace.clone()); } TreeData::Execution(operation_type) => { if operation_type == OperationKind::Subscription.as_apollo_operation_type() { @@ -282,21 +286,17 @@ impl Exporter { } } - Ok(Box::new(root_trace)) + Ok(results) } - fn extract_trace(&mut self, span: LightSpanData) -> Result, Error> { - self.extract_data_from_spans(&span)? - .pop() - .and_then(|node| { - match node { - TreeData::Request(trace) | TreeData::SubscriptionEvent(trace) => { - Some(trace) - } - _ => None - } - }) - .expect("root trace must exist because it is constructed on the request or subscription_event span, qed") + fn extract_traces(&mut self, span: LightSpanData) -> Result, Error> { + let mut results = vec![]; + for node in self.extract_data_from_spans(&span)? { + if let TreeData::Request(trace) | TreeData::SubscriptionEvent(trace) = node { + results.push(*trace?) + } + } + Ok(results) } fn extract_data_from_spans(&mut self, span: &LightSpanData) -> Result, Error> { @@ -417,11 +417,11 @@ impl Exporter { }); child_nodes } - _ if span.attributes.get(&APOLLO_PRIVATE_REQUEST).is_some() => { - vec![TreeData::Request( - self.extract_root_trace(span, child_nodes), - )] - } + _ if span.attributes.get(&APOLLO_PRIVATE_REQUEST).is_some() => self + .extract_root_traces(span, child_nodes)? + .into_iter() + .map(|node| TreeData::Request(Ok(Box::new(node)))) + .collect(), ROUTER_SPAN_NAME => { child_nodes.push(TreeData::Router { http: Box::new(extract_http_data(span)), @@ -550,9 +550,10 @@ impl Exporter { .to_string(), )); - vec![TreeData::SubscriptionEvent( - self.extract_root_trace(span, child_nodes), - )] + self.extract_root_traces(span, child_nodes)? + .into_iter() + .map(|node| TreeData::SubscriptionEvent(Ok(Box::new(node)))) + .collect() } _ => child_nodes, }) @@ -705,12 +706,14 @@ impl SpanExporter for Exporter { if span.attributes.get(&APOLLO_PRIVATE_REQUEST).is_some() || span.name == SUBSCRIPTION_EVENT_SPAN_NAME { - match self.extract_trace(span.into()) { - Ok(mut trace) => { - let mut operation_signature = Default::default(); - std::mem::swap(&mut trace.signature, &mut operation_signature); - if !operation_signature.is_empty() { - traces.push((operation_signature, *trace)); + match self.extract_traces(span.into()) { + Ok(extracted_traces) => { + for mut trace in extracted_traces { + let mut operation_signature = Default::default(); + std::mem::swap(&mut trace.signature, &mut operation_signature); + if !operation_signature.is_empty() { + traces.push((operation_signature, trace)); + } } } Err(Error::MultipleErrors(errors)) => { diff --git a/apollo-router/src/request.rs b/apollo-router/src/request.rs index cbd3c3d75c..9812b11d50 100644 --- a/apollo-router/src/request.rs +++ b/apollo-router/src/request.rs @@ -8,6 +8,7 @@ use serde_json_bytes::ByteString; use serde_json_bytes::Map as JsonMap; use serde_json_bytes::Value; +use crate::configuration::BatchingMode; use crate::json_ext::Object; /// A GraphQL `Request` used to represent both supergraph and subgraph requests. @@ -165,27 +166,101 @@ impl Request { /// /// An error will be produced in the event that the query string parameters /// cannot be turned into a valid GraphQL `Request`. - pub fn from_urlencoded_query(url_encoded_query: String) -> Result { - let urldecoded: serde_json::Value = - serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) - .map_err(serde_json::Error::custom)?; + pub(crate) fn batch_from_urlencoded_query( + url_encoded_query: String, + ) -> Result, serde_json::Error> { + let value: serde_json::Value = serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) + .map_err(serde_json::Error::custom)?; - let operation_name = if let Some(serde_json::Value::String(operation_name)) = - urldecoded.get("operationName") - { - Some(operation_name.clone()) + Request::process_query_values(&value) + } + + /// Convert Bytes into a GraphQL [`Request`]. + /// + /// An error will be produced in the event that the query string parameters + /// cannot be turned into a valid GraphQL `Request`. + pub(crate) fn batch_from_bytes(bytes: &[u8]) -> Result, serde_json::Error> { + let value: serde_json::Value = + serde_json::from_slice(bytes).map_err(serde_json::Error::custom)?; + + Request::process_batch_values(&value) + } + + fn allocate_result_array(value: &serde_json::Value) -> Vec { + match value.as_array() { + Some(array) => Vec::with_capacity(array.len()), + None => Vec::with_capacity(1), + } + } + + fn process_batch_values(value: &serde_json::Value) -> Result, serde_json::Error> { + let mut result = Request::allocate_result_array(value); + + if value.is_array() { + tracing::info!( + histogram.apollo_router.operations.batching.size = result.len() as f64, + mode = %BatchingMode::BatchHttpLink // Only supported mode right now + ); + + tracing::info!( + monotonic_counter.apollo_router.operations.batching = 1u64, + mode = %BatchingMode::BatchHttpLink // Only supported mode right now + ); + for entry in value + .as_array() + .expect("We already checked that it was an array") + { + let bytes = serde_json::to_vec(entry)?; + result.push(Request::deserialize_from_bytes(&bytes.into())?); + } } else { - None - }; + let bytes = serde_json::to_vec(value)?; + result.push(Request::deserialize_from_bytes(&bytes.into())?); + } + Ok(result) + } + + fn process_query_values(value: &serde_json::Value) -> Result, serde_json::Error> { + let mut result = Request::allocate_result_array(value); + + if value.is_array() { + tracing::info!( + histogram.apollo_router.operations.batching.size = result.len() as f64, + mode = "batch_http_link" // Only supported mode right now + ); + + tracing::info!( + monotonic_counter.apollo_router.operations.batching = 1u64, + mode = "batch_http_link" // Only supported mode right now + ); + for entry in value + .as_array() + .expect("We already checked that it was an array") + { + result.push(Request::process_value(entry)?); + } + } else { + result.push(Request::process_value(value)?) + } + Ok(result) + } + + fn process_value(value: &serde_json::Value) -> Result { + let operation_name = + if let Some(serde_json::Value::String(operation_name)) = value.get("operationName") { + Some(operation_name.clone()) + } else { + None + }; - let query = if let Some(serde_json::Value::String(query)) = urldecoded.get("query") { + let query = if let Some(serde_json::Value::String(query)) = value.get("query") { Some(query.as_str()) } else { None }; - let variables: Object = get_from_urldecoded(&urldecoded, "variables")?.unwrap_or_default(); + let variables: Object = get_from_urlencoded_value(value, "variables")?.unwrap_or_default(); let extensions: Object = - get_from_urldecoded(&urldecoded, "extensions")?.unwrap_or_default(); + get_from_urlencoded_value(value, "extensions")?.unwrap_or_default(); let request_builder = Self::builder() .variables(variables) @@ -200,9 +275,22 @@ impl Request { Ok(request) } + + /// Convert encoded URL query string parameters (also known as "search + /// params") into a GraphQL [`Request`]. + /// + /// An error will be produced in the event that the query string parameters + /// cannot be turned into a valid GraphQL `Request`. + pub fn from_urlencoded_query(url_encoded_query: String) -> Result { + let urldecoded: serde_json::Value = + serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) + .map_err(serde_json::Error::custom)?; + + Request::process_value(&urldecoded) + } } -fn get_from_urldecoded<'a, T: Deserialize<'a>>( +fn get_from_urlencoded_value<'a, T: Deserialize<'a>>( object: &'a serde_json::Value, key: &str, ) -> Result, serde_json::Error> { diff --git a/apollo-router/src/services/query_batching/testdata/badly_formatted_batch_response.json b/apollo-router/src/services/query_batching/testdata/badly_formatted_batch_response.json new file mode 100644 index 0000000000..bccd27474a --- /dev/null +++ b/apollo-router/src/services/query_batching/testdata/badly_formatted_batch_response.json @@ -0,0 +1,11 @@ +{ + "errors": [ + { + "message": "Invalid GraphQL request", + "extensions": { + "details": "failed to deserialize the request body into JSON: EOF while parsing a list at line 1 column 784", + "code": "INVALID_GRAPHQL_REQUEST" + } + } + ] +} diff --git a/apollo-router/src/services/query_batching/testdata/batching_not_enabled_response.json b/apollo-router/src/services/query_batching/testdata/batching_not_enabled_response.json new file mode 100644 index 0000000000..2e924b7eff --- /dev/null +++ b/apollo-router/src/services/query_batching/testdata/batching_not_enabled_response.json @@ -0,0 +1,11 @@ +{ + "errors": [ + { + "message": "Invalid GraphQL request", + "extensions": { + "details": "batching not enabled", + "code": "BATCHING_NOT_ENABLED" + } + } + ] +} diff --git a/apollo-router/src/services/query_batching/testdata/expected_good_response.json b/apollo-router/src/services/query_batching/testdata/expected_good_response.json new file mode 100644 index 0000000000..9edfc22538 --- /dev/null +++ b/apollo-router/src/services/query_batching/testdata/expected_good_response.json @@ -0,0 +1,98 @@ +[ + { + "data": { + "topProducts": [ + { + "upc": "1", + "name": "Table", + "reviews": [ + { + "id": "1", + "product": { + "name": "Table" + }, + "author": { + "id": "1", + "name": "Ada Lovelace" + } + }, + { + "id": "4", + "product": { + "name": "Table" + }, + "author": { + "id": "2", + "name": "Alan Turing" + } + } + ] + }, + { + "upc": "2", + "name": "Couch", + "reviews": [ + { + "id": "2", + "product": { + "name": "Couch" + }, + "author": { + "id": "1", + "name": "Ada Lovelace" + } + } + ] + } + ] + } + }, + { + "data": { + "topProducts": [ + { + "upc": "1", + "name": "Table", + "reviews": [ + { + "id": "1", + "product": { + "name": "Table" + }, + "author": { + "id": "1", + "name": "Ada Lovelace" + } + }, + { + "id": "4", + "product": { + "name": "Table" + }, + "author": { + "id": "2", + "name": "Alan Turing" + } + } + ] + }, + { + "upc": "2", + "name": "Couch", + "reviews": [ + { + "id": "2", + "product": { + "name": "Couch" + }, + "author": { + "id": "1", + "name": "Ada Lovelace" + } + } + ] + } + ] + } + } +] diff --git a/apollo-router/src/services/router_service.rs b/apollo-router/src/services/router_service.rs index 879447c4d4..1f0f1de51b 100644 --- a/apollo-router/src/services/router_service.rs +++ b/apollo-router/src/services/router_service.rs @@ -5,6 +5,10 @@ use std::task::Poll; use axum::body::StreamBody; use axum::response::*; +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; +use futures::future::join_all; use futures::future::ready; use futures::future::BoxFuture; use futures::stream; @@ -12,6 +16,7 @@ use futures::stream::once; use futures::stream::StreamExt; use http::header::CONTENT_TYPE; use http::header::VARY; +use http::request::Parts; use http::HeaderMap; use http::HeaderName; use http::HeaderValue; @@ -46,7 +51,10 @@ use super::APPLICATION_JSON_HEADER_VALUE; use super::MULTIPART_DEFER_CONTENT_TYPE; use super::MULTIPART_SUBSCRIPTION_CONTENT_TYPE; use crate::cache::DeduplicatingCache; +use crate::configuration::Batching; +use crate::configuration::BatchingMode; use crate::graphql; +use crate::http_ext; #[cfg(test)] use crate::plugin::test::MockSupergraphService; use crate::protocols::multipart::Multipart; @@ -61,6 +69,7 @@ use crate::services::RouterResponse; use crate::services::SupergraphRequest; use crate::services::SupergraphResponse; use crate::Configuration; +use crate::Context; use crate::Endpoint; use crate::ListenAddr; @@ -80,6 +89,7 @@ pub(crate) struct RouterService { persisted_query_layer: Arc, query_analysis_layer: QueryAnalysisLayer, experimental_http_max_request_bytes: usize, + experimental_batching: Batching, } impl RouterService { @@ -89,6 +99,7 @@ impl RouterService { persisted_query_layer: Arc, query_analysis_layer: QueryAnalysisLayer, experimental_http_max_request_bytes: usize, + experimental_batching: Batching, ) -> Self { RouterService { supergraph_creator, @@ -96,6 +107,7 @@ impl RouterService { persisted_query_layer, query_analysis_layer, experimental_http_max_request_bytes, + experimental_batching, } } } @@ -211,34 +223,10 @@ impl Service for RouterService { } impl RouterService { - async fn call_inner(&self, req: RouterRequest) -> Result { - let context = req.context.clone(); - - let supergraph_request = match self.translate_request(req).await { - Ok(request) => request, - Err((status_code, error, extension_details)) => { - ::tracing::error!( - monotonic_counter.apollo_router_http_requests_total = 1u64, - status = %status_code.as_u16(), - error = %error, - %error - ); - - return router::Response::error_builder() - .error( - graphql::Error::builder() - .message(String::from("Invalid GraphQL request")) - .extension_code("INVALID_GRAPHQL_REQUEST") - .extension("details", extension_details) - .build(), - ) - .status_code(status_code) - .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) - .context(context) - .build(); - } - }; - + async fn process_supergraph_request( + &self, + supergraph_request: SupergraphRequest, + ) -> Result { let mut request_res = self .persisted_query_layer .supergraph_request(supergraph_request); @@ -348,31 +336,210 @@ impl RouterService { Ok(RouterResponse { response, context }) } else { // this should be unreachable due to a previous check, but just to be sure... - router::Response::error_builder() - .error( - graphql::Error::builder() - .message(format!( - r#"'accept' header must be one of: \"*/*\", {:?}, {:?} or {:?}"#, - APPLICATION_JSON.essence_str(), - GRAPHQL_JSON_RESPONSE_HEADER_VALUE, - MULTIPART_DEFER_CONTENT_TYPE - )) - .extension_code("INVALID_ACCEPT_HEADER") - .build(), - ) - .status_code(StatusCode::NOT_ACCEPTABLE) - .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) - .context(context) - .build() + Ok(router::Response::error_builder() + .error( + graphql::Error::builder() + .message(format!( + r#"'accept' header must be one of: \"*/*\", {:?}, {:?} or {:?}"#, + APPLICATION_JSON.essence_str(), + GRAPHQL_JSON_RESPONSE_HEADER_VALUE, + MULTIPART_DEFER_CONTENT_TYPE + )) + .extension_code("INVALID_ACCEPT_HEADER") + .build(), + ) + .status_code(StatusCode::NOT_ACCEPTABLE) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .context(context) + .build()?) } } } } + async fn call_inner(&self, req: RouterRequest) -> Result { + let context = req.context.clone(); + + let supergraph_requests = match self.translate_request(req).await { + Ok(requests) => requests, + Err(err) => { + ::tracing::error!( + monotonic_counter.apollo_router_http_requests_total = 1u64, + status = %err.status.as_u16(), + error = %err.error, + ); + + return router::Response::error_builder() + .error( + graphql::Error::builder() + .message(String::from("Invalid GraphQL request")) + .extension_code(err.extension_code) + .extension("details", err.extension_details) + .build(), + ) + .status_code(err.status) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .context(context) + .build(); + } + }; + + let futures = supergraph_requests + .into_iter() + .map(|supergraph_request| self.process_supergraph_request(supergraph_request)); + + // Use join_all to preserve ordering of concurrent operations + // (Short circuit processing and propagate any errors in the batch) + let mut results: Vec = join_all(futures) + .await + .into_iter() + .collect::, BoxError>>()?; + + // If we only have one result, go ahead and return it. Otherwise, create a new result + // which is an array of all results. + if results.len() == 1 { + Ok(results.pop().expect("we should have at least one response")) + } else { + let first = results.pop().expect("we should have at least one response"); + let (parts, body) = first.response.into_parts(); + let context = first.context; + let mut bytes = BytesMut::new(); + bytes.put_u8(b'['); + bytes.extend_from_slice(&hyper::body::to_bytes(body).await?); + for result in results { + bytes.put(&b", "[..]); + bytes.extend_from_slice(&hyper::body::to_bytes(result.response.into_body()).await?); + } + bytes.put_u8(b']'); + + Ok(RouterResponse { + response: http::Response::from_parts(parts, Body::from(bytes.freeze())), + context, + }) + } + } + + async fn translate_query_request( + &self, + parts: &Parts, + ) -> Result, TranslateError> { + parts.uri.query().map(|q| { + let mut result = vec![]; + + match graphql::Request::from_urlencoded_query(q.to_string()) { + Ok(request) => { + result.push(request); + } + Err(err) => { + // It may be a batch of requests, so try that (if config allows) before + // erroring out + if self.experimental_batching.enabled + && matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) + { + result = graphql::Request::batch_from_urlencoded_query(q.to_string()) + .map_err(|e| TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to decode a valid GraphQL request from path", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to decode a valid GraphQL request from path {e}" + ), + })?; + } else if !q.is_empty() && q.as_bytes()[0] == b'[' { + let extension_details = if self.experimental_batching.enabled + && !matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) { + format!("batching not supported for mode `{}`", self.experimental_batching.mode) + } else { + "batching not enabled".to_string() + }; + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "batching not enabled", + extension_code: "BATCHING_NOT_ENABLED", + extension_details, + }); + } else { + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to decode a valid GraphQL request from path", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to decode a valid GraphQL request from path {err}" + ), + }); + } + } + }; + Ok(result) + }).unwrap_or_else(|| { + Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "There was no GraphQL operation to execute. Use the `query` parameter to send an operation, using either GET or POST.", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: "There was no GraphQL operation to execute. Use the `query` parameter to send an operation, using either GET or POST.".to_string() + }) + }) + } + + fn translate_bytes_request( + &self, + bytes: &Bytes, + ) -> Result, TranslateError> { + let mut result = vec![]; + + match graphql::Request::deserialize_from_bytes(bytes) { + Ok(request) => { + result.push(request); + } + Err(err) => { + if self.experimental_batching.enabled + && matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) + { + result = + graphql::Request::batch_from_bytes(bytes).map_err(|e| TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to deserialize the request body into JSON", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to deserialize the request body into JSON: {e}" + ), + })?; + } else if !bytes.is_empty() && bytes[0] == b'[' { + let extension_details = if self.experimental_batching.enabled + && !matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) + { + format!( + "batching not supported for mode `{}`", + self.experimental_batching.mode + ) + } else { + "batching not enabled".to_string() + }; + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "batching not enabled", + extension_code: "BATCHING_NOT_ENABLED", + extension_details, + }); + } else { + return Err(TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to deserialize the request body into JSON", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!( + "failed to deserialize the request body into JSON: {err}" + ), + }); + } + } + }; + Ok(result) + } + async fn translate_request( &self, req: RouterRequest, - ) -> Result { + ) -> Result, TranslateError> { let RouterRequest { router_request, context, @@ -380,26 +547,10 @@ impl RouterService { let (parts, body) = router_request.into_parts(); - let graphql_request = if parts.method == Method::GET { - parts - .uri - .query() - .map(|q| { - graphql::Request::from_urlencoded_query(q.to_string()).map_err(|e| { - ( - StatusCode::BAD_REQUEST, - "failed to decode a valid GraphQL request from path", - format!("failed to decode a valid GraphQL request from path {e}"), - ) - }) - }) - .unwrap_or_else(|| { - Err(( - StatusCode::BAD_REQUEST, - "There was no GraphQL operation to execute. Use the `query` parameter to send an operation, using either GET or POST.", - "There was no GraphQL operation to execute. Use the `query` parameter to send an operation, using either GET or POST.".to_string() - )) - }) + let graphql_requests: Result, TranslateError> = if parts.method + == Method::GET + { + self.translate_query_request(&parts).await } else { // FIXME: use a try block when available: https://github.com/rust-lang/rust/issues/31436 let content_length = (|| { @@ -412,11 +563,12 @@ impl RouterService { .ok() })(); if content_length.unwrap_or(0) > self.experimental_http_max_request_bytes { - Err(( - StatusCode::PAYLOAD_TOO_LARGE, - "payload too large for the `experimental_http_max_request_bytes` configuration", - "payload too large".to_string(), - )) + Err(TranslateError { + status: StatusCode::PAYLOAD_TOO_LARGE, + error: "payload too large for the `experimental_http_max_request_bytes` configuration", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: "payload too large".to_string(), + }) } else { let body = http_body::Limited::new(body, self.experimental_http_max_request_bytes); hyper::body::to_bytes(body) @@ -424,40 +576,98 @@ impl RouterService { .await .map_err(|e| { if e.is::() { - ( - StatusCode::PAYLOAD_TOO_LARGE, - "payload too large for the `experimental_http_max_request_bytes` configuration", - "payload too large".to_string(), - ) + TranslateError { + status: StatusCode::PAYLOAD_TOO_LARGE, + error: "payload too large for the `experimental_http_max_request_bytes` configuration", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: "payload too large".to_string(), + } } else { - ( - StatusCode::BAD_REQUEST, - "failed to get the request body", - format!("failed to get the request body: {e}"), - ) + TranslateError { + status: StatusCode::BAD_REQUEST, + error: "failed to get the request body", + extension_code: "INVALID_GRAPHQL_REQUEST", + extension_details: format!("failed to get the request body: {e}"), + } } }) .and_then(|bytes| { - graphql::Request::deserialize_from_bytes(&bytes).map_err(|err| { - ( - StatusCode::BAD_REQUEST, - "failed to deserialize the request body into JSON", - format!( - "failed to deserialize the request body into JSON: {err}" - ), - ) - }) + self.translate_bytes_request(&bytes) }) } }; - Ok(SupergraphRequest { - supergraph_request: http::Request::from_parts(parts, graphql_request?), - context, - }) + let mut ok_results = graphql_requests?; + let mut results = Vec::with_capacity(ok_results.len()); + let first = ok_results + .pop() + .expect("We must have at least one response"); + let sg = http::Request::from_parts(parts, first); + + context + .private_entries + .lock() + .insert(self.experimental_batching.clone()); + // Building up the batch of supergraph requests is tricky. + // Firstly note that any http extensions are only propagated for the first request sent + // through the pipeline. This is because there is simply no way to clone http + // extensions. + // + // Secondly, we can't clone private_entries, but we need to propagate at least + // ClientRequestAccepts to ensure correct processing of the response. We do that manually, + // but the concern is that there may be other private_entries that wish to propagate into + // each request or we may add them in future and not know about it here... + // + // (Technically we could clone private entries, since it is held under an `Arc`, but that + // would mean all the requests in a batch shared the same set of private entries and review + // comments expressed the sentiment that this may be a bad thing...) + // + for graphql_request in ok_results { + // XXX Lose http extensions, is that ok? + let mut new = http_ext::clone_http_request(&sg); + *new.body_mut() = graphql_request; + // XXX Lose some private entries, is that ok? + let new_context = Context::new(); + new_context.extend(&context); + let client_request_accepts_opt = context + .private_entries + .lock() + .get::() + .cloned(); + if let Some(client_request_accepts) = client_request_accepts_opt { + new_context + .private_entries + .lock() + .insert(client_request_accepts); + } + new_context + .private_entries + .lock() + .insert(self.experimental_batching.clone()); + results.push(SupergraphRequest { + supergraph_request: new, + // Build a new context. Cloning would cause issues. + context: new_context, + }); + } + results.insert( + 0, + SupergraphRequest { + supergraph_request: sg, + context, + }, + ); + Ok(results) } } +struct TranslateError<'a> { + status: StatusCode, + error: &'a str, + extension_code: &'a str, + extension_details: String, +} + // Process the headers to make sure that `VARY` is set correctly fn process_vary_header(headers: &mut HeaderMap) { if headers.get(VARY).is_none() { @@ -475,6 +685,7 @@ pub(crate) struct RouterCreator { pub(crate) persisted_query_layer: Arc, query_analysis_layer: QueryAnalysisLayer, experimental_http_max_request_bytes: usize, + experimental_batching: Batching, } impl ServiceFactory for RouterCreator { @@ -527,6 +738,7 @@ impl RouterCreator { .limits .experimental_http_max_request_bytes, persisted_query_layer, + experimental_batching: configuration.experimental_batching.clone(), }) } @@ -544,6 +756,7 @@ impl RouterCreator { self.persisted_query_layer.clone(), self.query_analysis_layer.clone(), self.experimental_http_max_request_bytes, + self.experimental_batching.clone(), )); ServiceBuilder::new() @@ -781,4 +994,184 @@ mod tests { let response = with_config(CANNED_REQUEST_LEN - 1).await.response; assert_eq!(response.status(), http::StatusCode::PAYLOAD_TOO_LARGE); } + + // Test query batching + + #[tokio::test] + async fn it_only_accepts_batch_http_link_mode_for_query_batch() { + let expected_response: serde_json::Value = serde_json::from_str(include_str!( + "query_batching/testdata/batching_not_enabled_response.json" + )) + .unwrap(); + + async fn with_config() -> router::Response { + let http_request = supergraph::Request::canned_builder() + .build() + .unwrap() + .supergraph_request + .map(|req: crate::request::Request| { + // Modify the request so that it is a valid array of requests. + let mut json_bytes = serde_json::to_vec(&req).unwrap(); + let mut result = vec![b'[']; + result.append(&mut json_bytes.clone()); + result.push(b','); + result.append(&mut json_bytes); + result.push(b']'); + hyper::Body::from(result) + }); + let config = serde_json::json!({}); + crate::TestHarness::builder() + .configuration_json(config) + .unwrap() + .build_router() + .await + .unwrap() + .oneshot(router::Request::from(http_request)) + .await + .unwrap() + } + // Send a request + let response = with_config().await.response; + assert_eq!(response.status(), http::StatusCode::BAD_REQUEST); + let data: serde_json::Value = + serde_json::from_slice(&hyper::body::to_bytes(response.into_body()).await.unwrap()) + .unwrap(); + assert_eq!(expected_response, data); + } + + #[tokio::test] + async fn it_processes_a_valid_query_batch() { + let expected_response: serde_json::Value = serde_json::from_str(include_str!( + "query_batching/testdata/expected_good_response.json" + )) + .unwrap(); + + async fn with_config() -> router::Response { + let http_request = supergraph::Request::canned_builder() + .build() + .unwrap() + .supergraph_request + .map(|req: crate::request::Request| { + // Modify the request so that it is a valid array of requests. + let mut json_bytes = serde_json::to_vec(&req).unwrap(); + let mut result = vec![b'[']; + result.append(&mut json_bytes.clone()); + result.push(b','); + result.append(&mut json_bytes); + result.push(b']'); + hyper::Body::from(result) + }); + let config = serde_json::json!({ + "experimental_batching": { + "enabled": true, + "mode" : "batch_http_link" + } + }); + crate::TestHarness::builder() + .configuration_json(config) + .unwrap() + .build_router() + .await + .unwrap() + .oneshot(router::Request::from(http_request)) + .await + .unwrap() + } + // Send a request + let response = with_config().await.response; + assert_eq!(response.status(), http::StatusCode::OK); + let data: serde_json::Value = + serde_json::from_slice(&hyper::body::to_bytes(response.into_body()).await.unwrap()) + .unwrap(); + assert_eq!(expected_response, data); + } + + #[tokio::test] + async fn it_will_not_process_a_query_batch_without_enablement() { + let expected_response: serde_json::Value = serde_json::from_str(include_str!( + "query_batching/testdata/batching_not_enabled_response.json" + )) + .unwrap(); + + async fn with_config() -> router::Response { + let http_request = supergraph::Request::canned_builder() + .build() + .unwrap() + .supergraph_request + .map(|req: crate::request::Request| { + // Modify the request so that it is a valid array of requests. + let mut json_bytes = serde_json::to_vec(&req).unwrap(); + let mut result = vec![b'[']; + result.append(&mut json_bytes.clone()); + result.push(b','); + result.append(&mut json_bytes); + result.push(b']'); + hyper::Body::from(result) + }); + let config = serde_json::json!({}); + crate::TestHarness::builder() + .configuration_json(config) + .unwrap() + .build_router() + .await + .unwrap() + .oneshot(router::Request::from(http_request)) + .await + .unwrap() + } + // Send a request + let response = with_config().await.response; + assert_eq!(response.status(), http::StatusCode::BAD_REQUEST); + let data: serde_json::Value = + serde_json::from_slice(&hyper::body::to_bytes(response.into_body()).await.unwrap()) + .unwrap(); + assert_eq!(expected_response, data); + } + + #[tokio::test] + async fn it_will_not_process_a_poorly_formatted_query_batch() { + let expected_response: serde_json::Value = serde_json::from_str(include_str!( + "query_batching/testdata/badly_formatted_batch_response.json" + )) + .unwrap(); + + async fn with_config() -> router::Response { + let http_request = supergraph::Request::canned_builder() + .build() + .unwrap() + .supergraph_request + .map(|req: crate::request::Request| { + // Modify the request so that it is a valid array of requests. + let mut json_bytes = serde_json::to_vec(&req).unwrap(); + let mut result = vec![b'[']; + result.append(&mut json_bytes.clone()); + result.push(b','); + result.append(&mut json_bytes); + // Deliberately omit the required trailing ] + hyper::Body::from(result) + }); + let config = serde_json::json!({ + "experimental_batching": { + "enabled": true, + "mode" : "batch_http_link" + } + }); + crate::TestHarness::builder() + .configuration_json(config) + .unwrap() + .build_router() + .await + .unwrap() + .oneshot(router::Request::from(http_request)) + .await + .unwrap() + } + // Send a request + let response = with_config().await.response; + assert_eq!(response.status(), http::StatusCode::BAD_REQUEST); + let data: serde_json::Value = + serde_json::from_slice(&hyper::body::to_bytes(response.into_body()).await.unwrap()) + .unwrap(); + assert_eq!(expected_response, data); + } } diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index 207605ef55..b7acb05fba 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -37,6 +37,7 @@ use super::subgraph_service::MakeSubgraphService; use super::subgraph_service::SubgraphServiceFactory; use super::ExecutionServiceFactory; use super::QueryPlannerContent; +use crate::configuration::Batching; use crate::context::OPERATION_NAME; use crate::error::CacheResolverError; use crate::graphql; @@ -229,6 +230,29 @@ async fn service_call( let is_deferred = plan.is_deferred(operation_name.as_deref(), &variables); let is_subscription = plan.is_subscription(operation_name.as_deref()); + if let Some(batching) = context.private_entries.clone().lock().get::() { + if batching.enabled && (is_deferred || is_subscription) { + let message = if is_deferred { + "BATCHING_DEFER_UNSUPPORTED" + } else { + "BATCHING_SUBSCRIPTION_UNSUPPORTED" + }; + let mut response = SupergraphResponse::new_from_graphql_response( + graphql::Response::builder() + .errors(vec![crate::error::Error::builder() + .message(String::from( + "Deferred responses and subscriptions aren't supported in batches", + )) + .extension_code(message) + .build()]) + .build(), + context, + ); + *response.response.status_mut() = StatusCode::NOT_ACCEPTABLE; + return Ok(response); + } + } + let ClientRequestAccepts { multipart_defer: accepts_multipart_defer, multipart_subscription: accepts_multipart_subscription, diff --git a/apollo-router/tests/apollo_reports.rs b/apollo-router/tests/apollo_reports.rs index 9e30cf1ed6..941482cf8e 100644 --- a/apollo-router/tests/apollo_reports.rs +++ b/apollo-router/tests/apollo_reports.rs @@ -1,3 +1,23 @@ +//! Be aware that this test file contains some fairly flaky tests which embed a number of +//! assumptions about how traces and stats are reported to Apollo Studio. +//! +//! In particular: +//! - There are timings (sleeps) which work as things are implemented right now, but +//! may be sources of problems in the future. +//! +//! - There is a global TEST lock which forces these tests to execute serially to stop router +//! global tracing effect from breaking the tests. DO NOT BE TEMPTED to remove this TEST lock to +//! try and speed things up (unless you have time and patience to re-work a lot of test code). +//! +//! - There are assumptions about the different ways in which traces and metrics work. The main +//! limitation with these tests is that you are unlikely to get a single report containing all the +//! metrics that you need to make a test assertion. You might, but raciness in the way metrics are +//! generated in the router means you probably won't. That's why the test `test_batch_stats` has +//! its own stack of functions for testing and only tests that the total number of requests match. +//! +//! Summary: The dragons here are ancient and very evil. Do not attempt to take their treasure. +//! +use std::future::Future; use std::io::Read; use std::sync::Arc; use std::time::Duration; @@ -23,7 +43,7 @@ use prost_types::Timestamp; use proto::reports::trace::Node; use serde_json::json; use tokio::sync::Mutex; -use tokio::sync::OnceCell; +use tokio::task::JoinHandle; use tower::Service; use tower::ServiceExt; use tower_http::decompression::DecompressionLayer; @@ -33,85 +53,93 @@ use crate::proto::reports::trace::node::Id::ResponseName; use crate::proto::reports::Report; use crate::proto::reports::Trace; -static REPORTS: Lazy>>> = Lazy::new(Default::default); -static TEST: Lazy>> = Lazy::new(Default::default); static ROUTER_SERVICE_RUNTIME: Lazy> = Lazy::new(|| { Arc::new(tokio::runtime::Runtime::new().expect("must be able to create tokio runtime")) }); -static CONFIG: OnceCell = OnceCell::const_new(); -static ROUTER_SERVICE: Lazy>>> = Lazy::new(Default::default); -static MOCKED_ROUTER_SERVICE: Lazy>>> = - Lazy::new(Default::default); - -async fn config() -> serde_json::Value { - CONFIG - .get_or_init(|| async { - std::env::set_var("APOLLO_KEY", "test"); - std::env::set_var("APOLLO_GRAPH_REF", "test"); - - let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = listener.local_addr().unwrap(); - let app = axum::Router::new() - .route("/", post(report)) - .layer(DecompressionLayer::new()) - .layer(tower_http::add_extension::AddExtensionLayer::new( - (*REPORTS).clone(), - )); - - drop(ROUTER_SERVICE_RUNTIME.spawn(async move { - axum::Server::from_tcp(listener) - .expect("mut be able to crete report receiver") - .serve(app.into_make_service()) - .await - .expect("could not start axum server") - })); - - let mut config: serde_json::Value = - serde_yaml::from_str(include_str!("fixtures/apollo_reports.router.yaml")) - .expect("apollo_reports.router.yaml was invalid"); - config = jsonpath_lib::replace_with(config, "$.telemetry.apollo.endpoint", &mut |_| { - Some(serde_json::Value::String(format!("http://{addr}"))) - }) - .expect("Could not sub in endpoint"); - config - }) - .await - .clone() +static TEST: Lazy>> = Lazy::new(Default::default); + +async fn config( + batch: bool, + reports: Arc>>, +) -> (JoinHandle<()>, serde_json::Value) { + std::env::set_var("APOLLO_KEY", "test"); + std::env::set_var("APOLLO_GRAPH_REF", "test"); + + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let app = axum::Router::new() + .route("/", post(report)) + .layer(DecompressionLayer::new()) + .layer(tower_http::add_extension::AddExtensionLayer::new(reports)); + + let task = ROUTER_SERVICE_RUNTIME.spawn(async move { + axum::Server::from_tcp(listener) + .expect("mut be able to create report receiver") + .serve(app.into_make_service()) + .await + .expect("could not start axum server") + }); + + let mut config: serde_json::Value = if batch { + serde_yaml::from_str(include_str!("fixtures/apollo_reports.batch_router.yaml")) + .expect("apollo_reports.router.yaml was invalid") + } else { + serde_yaml::from_str(include_str!("fixtures/apollo_reports.router.yaml")) + .expect("apollo_reports.router.yaml was invalid") + }; + config = jsonpath_lib::replace_with(config, "$.telemetry.apollo.endpoint", &mut |_| { + Some(serde_json::Value::String(format!("http://{addr}"))) + }) + .expect("Could not sub in endpoint"); + (task, config) } -async fn get_router_service(mocked: bool) -> BoxCloneService { - let lazy = if mocked { - &MOCKED_ROUTER_SERVICE +async fn get_router_service( + reports: Arc>>, + mocked: bool, +) -> (JoinHandle<()>, BoxCloneService) { + let (task, config) = config(false, reports).await; + let builder = TestHarness::builder() + .try_log_level("INFO") + .configuration_json(config) + .expect("test harness had config errors") + .schema(include_str!("fixtures/supergraph.graphql")); + let builder = if mocked { + builder.subgraph_hook(|subgraph, _service| subgraph_mocks(subgraph)) } else { - &ROUTER_SERVICE + builder.with_subgraph_network_requests() }; - let mut router_service = lazy.lock().await; - if router_service.is_none() { - let config = config().await; - let service = ROUTER_SERVICE_RUNTIME - .spawn(async move { - let builder = TestHarness::builder() - .try_log_level("INFO") - .configuration_json(config) - .expect("test harness had config errors") - .schema(include_str!("fixtures/supergraph.graphql")); - let builder = if mocked { - builder.subgraph_hook(|subgraph, _service| subgraph_mocks(subgraph)) - } else { - builder.with_subgraph_network_requests() - }; - builder - .build_router() - .await - .expect("could create router test harness") - }) + ( + task, + builder + .build_router() .await - .expect("must be able to create router"); - *router_service = Some(service); - } - router_service - .clone() - .expect("router service must have got created") + .expect("could create router test harness"), + ) +} + +async fn get_batch_router_service( + reports: Arc>>, + mocked: bool, +) -> (JoinHandle<()>, BoxCloneService) { + let (task, config) = config(true, reports).await; + let builder = TestHarness::builder() + .try_log_level("INFO") + .configuration_json(config) + .expect("test harness had config errors") + .schema(include_str!("fixtures/supergraph.graphql")); + let builder = if mocked { + builder.subgraph_hook(|subgraph, _service| subgraph_mocks(subgraph)) + } else { + builder.with_subgraph_network_requests() + }; + ( + task, + builder + .build_router() + .await + .expect("could create router test harness"), + ) } fn encode_ftv1(trace: Trace) -> String { @@ -191,16 +219,29 @@ async fn report( Ok(Json(())) } -async fn get_trace_report(request: supergraph::Request) -> Report { - get_report(false, request, |r| { - let r = !r - .traces_per_query +async fn get_trace_report(reports: Arc>>, request: router::Request) -> Report { + get_report(get_router_service, reports, false, request, |r| { + !r.traces_per_query .values() .next() .expect("traces and stats required") .trace - .is_empty(); - r + .is_empty() + }) + .await +} + +async fn get_batch_trace_report( + reports: Arc>>, + request: router::Request, +) -> Report { + get_report(get_batch_router_service, reports, false, request, |r| { + !r.traces_per_query + .values() + .next() + .expect("traces and stats required") + .trace + .is_empty() }) .await } @@ -214,81 +255,129 @@ fn has_metrics(r: &&Report) -> bool { .is_empty() } -async fn get_metrics_report(request: supergraph::Request) -> Report { - get_report(false, request, has_metrics).await +async fn get_metrics_report(reports: Arc>>, request: router::Request) -> Report { + get_report(get_router_service, reports, false, request, has_metrics).await } -async fn get_metrics_report_mocked(request: supergraph::Request) -> Report { - get_report(true, request, has_metrics).await +async fn get_batch_metrics_report( + reports: Arc>>, + request: router::Request, +) -> u64 { + get_batch_stats_report(reports, false, request, has_metrics).await +} + +async fn get_metrics_report_mocked( + reports: Arc>>, + request: router::Request, +) -> Report { + get_report(get_router_service, reports, true, request, has_metrics).await } -async fn get_report bool + Send + Sync + Copy + 'static>( +async fn get_report bool + Send + Sync + Copy + 'static>( + service_fn: impl FnOnce(Arc>>, bool) -> Fut, + reports: Arc>>, mocked: bool, - request: supergraph::Request, + request: router::Request, filter: T, -) -> Report { - ROUTER_SERVICE_RUNTIME - .spawn(async move { - let mut found_report; - { - let _test_guard = TEST.lock().await; - { - REPORTS.clone().lock().await.clear(); - } - let req: router::Request = request.try_into().expect("could not convert request"); - - let response = get_router_service(mocked) - .await - .ready() - .await - .expect("router service was never ready") - .call(req) - .await - .expect("router service call failed"); - - // Drain the response - found_report = match hyper::body::to_bytes(response.response.into_body()) - .await - .map(|b| String::from_utf8(b.to_vec())) - { - Ok(Ok(response)) => { - if response.contains("errors") { - eprintln!("response had errors {response}"); - } - Ok(None) - } - _ => Err(anyhow!("error retrieving response")), - }; - - // We must always try to find the report regardless of if the response had failures - for _ in 0..10 { - let reports = REPORTS.lock().await; - let report = reports.iter().find(filter); - if report.is_some() { - if matches!(found_report, Ok(None)) { - found_report = Ok(report.cloned()); - } - break; - } - drop(reports); - tokio::time::sleep(Duration::from_millis(100)).await; - } - } +) -> Report +where + Fut: Future, BoxCloneService)>, +{ + let _guard = TEST.lock().await; + reports.lock().await.clear(); + let (task, mut service) = service_fn(reports.clone(), mocked).await; + let response = service + .ready() + .await + .expect("router service was never ready") + .call(request) + .await + .expect("router service call failed"); - found_report - }) + // Drain the response + let mut found_report = match hyper::body::to_bytes(response.response.into_body()) .await - .expect("failed to get report") + .map(|b| String::from_utf8(b.to_vec())) + { + Ok(Ok(response)) => { + if response.contains("errors") { + eprintln!("response had errors {response}"); + } + Ok(None) + } + _ => Err(anyhow!("error retrieving response")), + }; + + // We must always try to find the report regardless of if the response had failures + for _ in 0..10 { + let my_reports = reports.lock().await; + let report = my_reports.iter().find(filter); + if report.is_some() && matches!(found_report, Ok(None)) { + found_report = Ok(report.cloned()); + break; + } + drop(my_reports); + tokio::time::sleep(Duration::from_millis(100)).await; + } + task.abort(); + + found_report .expect("failed to get report") .expect("failed to find report") } + +async fn get_batch_stats_report bool + Send + Sync + Copy + 'static>( + reports: Arc>>, + mocked: bool, + request: router::Request, + filter: T, +) -> u64 { + let _guard = TEST.lock().await; + reports.lock().await.clear(); + let (task, mut service) = get_batch_router_service(reports.clone(), mocked).await; + let response = service + .ready() + .await + .expect("router service was never ready") + .call(request) + .await + .expect("router service call failed"); + + // Drain the response (and throw it away) + let _found_report = hyper::body::to_bytes(response.response.into_body()).await; + + // Give the server a little time to export something + // If this test fails, consider increasing this time. + tokio::time::sleep(Duration::from_millis(500)).await; + + let mut request_count = 0; + + // In a more ideal world we would have an implementation of `AddAssign<&reports::Report> + // However we don't. Let's do the minimal amount of checking and ensure that at least the + // number of requests can be tested. Clearly, this doesn't test all of the stats, but it's a + // fairly reliable check and at least we are testing something. + for report in reports.lock().await.iter().filter(filter) { + let stats = &report + .traces_per_query + .values() + .next() + .expect("has something") + .stats_with_context; + request_count += stats[0].query_latency_stats.as_ref().unwrap().request_count; + } + task.abort(); + request_count +} + #[tokio::test(flavor = "multi_thread")] async fn non_defer() { let request = supergraph::Request::fake_builder() .query("query{topProducts{name reviews {author{name}} reviews{author{name}}}}") .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; assert_report!(report); } @@ -300,7 +389,9 @@ async fn test_condition_if() { .header(ACCEPT, "multipart/mixed; deferSpec=20220824") .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; assert_report!(report); } @@ -312,7 +403,9 @@ async fn test_condition_else() { .header(ACCEPT, "multipart/mixed; deferSpec=20220824") .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; assert_report!(report); } @@ -322,7 +415,31 @@ async fn test_trace_id() { .query("query{topProducts{name reviews {author{name}} reviews{author{name}}}}") .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; + assert_report!(report); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_batch_trace_id() { + let request = supergraph::Request::fake_builder() + .query("query{topProducts{name reviews {author{name}} reviews{author{name}}}}") + .build() + .unwrap() + .supergraph_request + .map(|req| { + // Modify the request so that it is a valid array of requests. + let mut json_bytes = serde_json::to_vec(&req).unwrap(); + let mut result = vec![b'[']; + result.append(&mut json_bytes.clone()); + result.push(b','); + result.append(&mut json_bytes); + result.push(b']'); + hyper::Body::from(result) + }); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_batch_trace_report(reports, request.into()).await; assert_report!(report); } @@ -333,7 +450,9 @@ async fn test_client_name() { .header("apollographql-client-name", "my client") .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; assert_report!(report); } @@ -344,7 +463,9 @@ async fn test_client_version() { .header("apollographql-client-version", "my client version") .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; assert_report!(report); } @@ -356,7 +477,33 @@ async fn test_send_header() { .header("dont-send-header", "Header value") .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; + assert_report!(report); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_batch_send_header() { + let request = supergraph::Request::fake_builder() + .query("query{topProducts{name reviews {author{name}} reviews{author{name}}}}") + .header("send-header", "Header value") + .header("dont-send-header", "Header value") + .build() + .unwrap() + .supergraph_request + .map(|req| { + // Modify the request so that it is a valid array of requests. + let mut json_bytes = serde_json::to_vec(&req).unwrap(); + let mut result = vec![b'[']; + result.append(&mut json_bytes.clone()); + result.push(b','); + result.append(&mut json_bytes); + result.push(b']'); + hyper::Body::from(result) + }); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_batch_trace_report(reports, request.into()).await; assert_report!(report); } @@ -368,7 +515,9 @@ async fn test_send_variable_value() { .variable("dontSendValue", true) .build() .unwrap(); - let report = get_trace_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_trace_report(reports, req).await; assert_report!(report); } @@ -378,17 +527,47 @@ async fn test_stats() { .query("query{topProducts{name reviews {author{name}} reviews{author{name}}}}") .build() .unwrap(); - let report = get_metrics_report(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_metrics_report(reports, req).await; assert_report!(report); } +#[tokio::test(flavor = "multi_thread")] +async fn test_batch_stats() { + let request = supergraph::Request::fake_builder() + .query("query{topProducts{name reviews {author{name}} reviews{author{name}}}}") + .build() + .unwrap() + .supergraph_request + .map(|req| { + // Modify the request so that it is a valid array containing 2 requests. + let mut json_bytes = serde_json::to_vec(&req).unwrap(); + let mut result = vec![b'[']; + result.append(&mut json_bytes.clone()); + result.push(b','); + result.append(&mut json_bytes); + result.push(b']'); + hyper::Body::from(result) + }); + let reports = Arc::new(Mutex::new(vec![])); + // We can't do a report assert here because we will probably have multiple reports which we + // can't merge... + // Let's call a function that enables us to at least assert that we received the correct number + // of requests. + let request_count = get_batch_metrics_report(reports, request.into()).await; + assert_eq!(2, request_count); +} + #[tokio::test(flavor = "multi_thread")] async fn test_stats_mocked() { let request = supergraph::Request::fake_builder() .query("query{topProducts{name reviews {author{name}} reviews{author{name}}}}") .build() .unwrap(); - let report = get_metrics_report_mocked(request).await; + let req: router::Request = request.try_into().expect("could not convert request"); + let reports = Arc::new(Mutex::new(vec![])); + let report = get_metrics_report_mocked(reports, req).await; let per_query = report.traces_per_query.values().next().unwrap(); let stats = per_query.stats_with_context.first().unwrap(); insta::with_settings!({sort_maps => true}, { diff --git a/apollo-router/tests/fixtures/apollo_reports.batch_router.yaml b/apollo-router/tests/fixtures/apollo_reports.batch_router.yaml new file mode 100644 index 0000000000..bf22811ba2 --- /dev/null +++ b/apollo-router/tests/fixtures/apollo_reports.batch_router.yaml @@ -0,0 +1,28 @@ +experimental_batching: + enabled: true + mode: batch_http_link +include_subgraph_errors: + all: true +telemetry: + tracing: + experimental_response_trace_id: + enabled: true + header_name: "my_trace_id" + trace_config: + sampler: always_on + + apollo: + client_name_header: apollographql-client-name + client_version_header: apollographql-client-version + endpoint: ENDPOINT + batch_processor: + scheduled_delay: 10ms + field_level_instrumentation_sampler: always_on + send_headers: + only: + - "send-header" + send_variable_values: + only: + - "sendValue" + + diff --git a/apollo-router/tests/snapshots/apollo_reports__batch_send_header.snap b/apollo-router/tests/snapshots/apollo_reports__batch_send_header.snap new file mode 100644 index 0000000000..308c1bdd13 --- /dev/null +++ b/apollo-router/tests/snapshots/apollo_reports__batch_send_header.snap @@ -0,0 +1,1095 @@ +--- +source: apollo-router/tests/apollo_reports.rs +expression: report +--- +header: + graph_ref: test + hostname: "[hostname]" + agent_version: "[agent_version]" + service_version: "" + runtime_version: rust + uname: "[uname]" + executable_schema_id: "[executable_schema_id]" +traces_per_query: + "# -\n{topProducts{name reviews{author{name}}reviews{author{name}}}}": + trace: + - start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: + variables_json: {} + operation_name: "" + client_name: "" + client_version: "" + operation_type: query + operation_subtype: "" + http: + method: 4 + request_headers: + send-header: + value: + - Header value + response_headers: + my_trace_id: "[my_trace_id]" + status_code: 0 + cache_policy: ~ + query_plan: + node: + Sequence: + nodes: + - node: + Fetch: + service_name: products + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[Product]" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 2 + id: + ResponseName: topProducts + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + node: + node: + Fetch: + service_name: reviews + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 1 + id: + ResponseName: reviews + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 2 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + - id: + FieldName: reviews + - id: + FieldName: author + node: + node: + Fetch: + service_name: accounts + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + - start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: + variables_json: {} + operation_name: "" + client_name: "" + client_version: "" + operation_type: query + operation_subtype: "" + http: + method: 4 + request_headers: + send-header: + value: + - Header value + response_headers: + my_trace_id: "[my_trace_id]" + status_code: 0 + cache_policy: ~ + query_plan: + node: + Sequence: + nodes: + - node: + Fetch: + service_name: products + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[Product]" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 2 + id: + ResponseName: topProducts + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + node: + node: + Fetch: + service_name: reviews + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 1 + id: + ResponseName: reviews + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 2 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + - id: + FieldName: reviews + - id: + FieldName: author + node: + node: + Fetch: + service_name: accounts + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + stats_with_context: [] + referenced_fields_by_type: {} + internal_traces_contributing_to_stats: [] +end_time: "[end_time]" +operation_count: 0 +operation_count_by_type: [] +traces_pre_aggregated: true + diff --git a/apollo-router/tests/snapshots/apollo_reports__batch_trace_id.snap b/apollo-router/tests/snapshots/apollo_reports__batch_trace_id.snap new file mode 100644 index 0000000000..097326a64c --- /dev/null +++ b/apollo-router/tests/snapshots/apollo_reports__batch_trace_id.snap @@ -0,0 +1,1089 @@ +--- +source: apollo-router/tests/apollo_reports.rs +expression: report +--- +header: + graph_ref: test + hostname: "[hostname]" + agent_version: "[agent_version]" + service_version: "" + runtime_version: rust + uname: "[uname]" + executable_schema_id: "[executable_schema_id]" +traces_per_query: + "# -\n{topProducts{name reviews{author{name}}reviews{author{name}}}}": + trace: + - start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: + variables_json: {} + operation_name: "" + client_name: "" + client_version: "" + operation_type: query + operation_subtype: "" + http: + method: 4 + request_headers: {} + response_headers: + my_trace_id: "[my_trace_id]" + status_code: 0 + cache_policy: ~ + query_plan: + node: + Sequence: + nodes: + - node: + Fetch: + service_name: products + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[Product]" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 2 + id: + ResponseName: topProducts + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + node: + node: + Fetch: + service_name: reviews + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 1 + id: + ResponseName: reviews + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 2 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + - id: + FieldName: reviews + - id: + FieldName: author + node: + node: + Fetch: + service_name: accounts + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + - start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: + variables_json: {} + operation_name: "" + client_name: "" + client_version: "" + operation_type: query + operation_subtype: "" + http: + method: 4 + request_headers: {} + response_headers: + my_trace_id: "[my_trace_id]" + status_code: 0 + cache_policy: ~ + query_plan: + node: + Sequence: + nodes: + - node: + Fetch: + service_name: products + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[Product]" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String! + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: upc + - original_field_name: "" + type: String + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 2 + id: + ResponseName: topProducts + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + node: + node: + Fetch: + service_name: reviews + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 1 + id: + ResponseName: reviews + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 1 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "[Review]" + parent_type: Product + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: User + parent_type: Review + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: ID! + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: id + id: + ResponseName: author + id: + Index: 0 + id: + ResponseName: reviews + id: + Index: 2 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + - node: + Flatten: + response_path: + - id: + FieldName: topProducts + - id: + FieldName: reviews + - id: + FieldName: author + node: + node: + Fetch: + service_name: accounts + trace_parsing_failed: false + trace: + start_time: + seconds: "[seconds]" + nanos: "[nanos]" + end_time: + seconds: "[seconds]" + nanos: "[nanos]" + duration_ns: "[duration_ns]" + root: + original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: 0 + end_time: 0 + error: [] + child: + - original_field_name: "" + type: "[_Entity]!" + parent_type: Query + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 0 + - original_field_name: "" + type: "" + parent_type: "" + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: + - original_field_name: "" + type: String + parent_type: User + cache_policy: ~ + start_time: "[start_time]" + end_time: "[end_time]" + error: [] + child: [] + id: + ResponseName: name + id: + Index: 1 + id: + ResponseName: _entities + id: ~ + is_incomplete: false + signature: "" + unexecuted_operation_body: "" + unexecuted_operation_name: "" + details: ~ + client_name: "" + client_version: "" + operation_type: "" + operation_subtype: "" + http: ~ + cache_policy: ~ + query_plan: ~ + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + sent_time_offset: "[sent_time_offset]" + sent_time: + seconds: "[seconds]" + nanos: "[nanos]" + received_time: + seconds: "[seconds]" + nanos: "[nanos]" + full_query_cache_hit: false + persisted_query_hit: false + persisted_query_register: false + registered_operation: false + forbidden_operation: false + field_execution_weight: 1 + stats_with_context: [] + referenced_fields_by_type: {} + internal_traces_contributing_to_stats: [] +end_time: "[end_time]" +operation_count: 0 +operation_count_by_type: [] +traces_pre_aggregated: true + diff --git a/apollo-router/tests/snapshots/lifecycle_tests__cli_config_experimental.snap b/apollo-router/tests/snapshots/lifecycle_tests__cli_config_experimental.snap index 8f377f6c81..268e046ec1 100644 --- a/apollo-router/tests/snapshots/lifecycle_tests__cli_config_experimental.snap +++ b/apollo-router/tests/snapshots/lifecycle_tests__cli_config_experimental.snap @@ -9,6 +9,7 @@ stderr: stdout: List of all experimental configurations with related GitHub discussions: + - experimental_batching: https://github.com/apollographql/router/discussions/3840 - experimental_http_max_request_bytes: https://github.com/apollographql/router/discussions/3220 - experimental_logging: https://github.com/apollographql/router/discussions/1961 - experimental_response_trace_id: https://github.com/apollographql/router/discussions/2147 diff --git a/docs/source/config.json b/docs/source/config.json index 73d6f03a0c..a51d0d9ecc 100644 --- a/docs/source/config.json +++ b/docs/source/config.json @@ -70,6 +70,12 @@ "Build and run queries": "/executing-operations/build-run-queries", "@defer support": "/executing-operations/defer-support", "Request format": "/executing-operations/requests", + "Query batching": [ + "/executing-operations/query-batching", + [ + "experimental" + ] + ], "GraphQL Subscriptions": { "Subscriptions setup": [ "/executing-operations/subscription-support", diff --git a/docs/source/configuration/metrics.mdx b/docs/source/configuration/metrics.mdx index bf6ed273ac..0df2494879 100644 --- a/docs/source/configuration/metrics.mdx +++ b/docs/source/configuration/metrics.mdx @@ -128,6 +128,11 @@ Note that the initial call to uplink during router startup will not be reflected - `apollo_router_deduplicated_subscriptions_total` - Number of subscriptions that has been deduplicated - `apollo_router_skipped_event_count` - Number of subscription events that has been skipped because too many events have been received from the subgraph but not yet sent to the client. +#### Batching + +- `apollo_router.operations.batching` - A counter of the number of query batches received by the router. +- `apollo_router.operations.batching.size` - A histogram tracking the number of queries contained within a query batch. + ## Using OpenTelemetry Collector You can send metrics to [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) for processing and reporting metrics. diff --git a/docs/source/configuration/overview.mdx b/docs/source/configuration/overview.mdx index b96f4fcc2a..eb4544f028 100644 --- a/docs/source/configuration/overview.mdx +++ b/docs/source/configuration/overview.mdx @@ -509,6 +509,10 @@ See [Configuring CORS in the Apollo Router](./cors). See [Apollo Router support for `@defer`](../executing-operations/defer-support/#disabling-defer). +### Query batching support + +See [Apollo Router's _experimental_ support for query batching](../executing-operations/query-batching). + ### Subscription support See [GraphQL subscriptions in the Apollo Router](../executing-operations/subscription-support/#router-setup). diff --git a/docs/source/configuration/traffic-shaping.mdx b/docs/source/configuration/traffic-shaping.mdx index 52b428781b..2c8abdd663 100644 --- a/docs/source/configuration/traffic-shaping.mdx +++ b/docs/source/configuration/traffic-shaping.mdx @@ -82,6 +82,18 @@ traffic_shaping: Compression is automatically supported on the client side, depending on the `Accept-Encoding` header provided by the client. +### Query batching + +The Apollo Router has _experimental_ support for receiving client query batches: + +```yaml title="router.yaml" +experimental_batching: + enabled: true + mode: batch_http_link +``` + +For details, see [query batching for the router](/executing-operations/query-batching). + ## Subgraph traffic shaping The Apollo Router supports various options affecting traffic destined for subgraphs, that can either be defined for all subgraphs, or overriden per subgraph: diff --git a/docs/source/executing-operations/query-batching.mdx b/docs/source/executing-operations/query-batching.mdx new file mode 100644 index 0000000000..3d83e4a6eb --- /dev/null +++ b/docs/source/executing-operations/query-batching.mdx @@ -0,0 +1,210 @@ +--- +title: Query batching +description: Receive query batches with the Apollo Router +--- + +> **Experimental Feature** +> The Apollo Router provides [experimental](/resources/product-launch-stages/#experimental-features) support for client query batching. + +Learn about query batching and how to configure the Apollo Router to receive query batches. + +## About query batching + +Modern applications often require several requests to render a single page. This is usually the result of a component-based architecture where individual micro-frontends (MFE) make requests separately to fetch data relevant to them. Not only does this cause a performance overhead—different components may be requesting the same data—it can also cause a consistency issue. To combat this, MFE-based UIs batch multiple client operations, issued close together, into a single HTTP request. This is supported in Apollo Client and Apollo Server. + +The Apollo Router supports client query batching. If you’re using Apollo Client, you can leverage the built-in support for batching to reduce the number of individual operations sent to the router. + +Once configured, Apollo Client automatically combines multiple operations into a single HTTP request. The number of operations within a batch is client-configurable, including the maximum number in a batch and the maximum duration to wait for operations to accumulate before sending the batch. + +The Apollo Router must be configured to receive query batches, otherwise it rejects them. When processing a batch, the router deserializes and processes each operation of a batch independently, and it responds to the client only after all operations of the batch have been completed. Each operation executes concurrently with respect to other operations in the batch. + +## Configure query batching + +Both the Apollo Router and client need to be configured to support query batching. + +### Configure router + +By default, receiving client query batches is _not_ enabled in the Apollo Router. + +To enable query batching, set the following fields in your `router.yaml` configuration file: + +```yaml title="router.yaml" +experimental_batching: + enabled: true + mode: batch_http_link +``` + +| Attribute | Description | Valid Values | Default Value | +| :-- | :-- | :-- | :-- | +| `enabled` | Flag to enable reception of client query batches | boolean | `false` | +| `mode` | Supported client batching mode | `batch_http_link`: the client uses Apollo Link and its [`BatchHttpLink`](/react/api/link/apollo-link-batch-http) link. | No Default | + +### Configure client + +To enable batching in an Apollo client, configure `BatchHttpLink`. For details on implementing `BatchHttpLink`, see [batching operations](/react/api/link/apollo-link-batch-http/). + +### Configuration compatibility + +If the router receives a query batch from a client, and batching is *not* enabled, the router sends a `BATCHING_NOT_ENABLED` error to the client. + +## Metrics for query batching + +Metrics in the Apollo Router for query batching: + + + + + + + + + + + + + + + + + + + + + + + +
NameAttributesDescription
+ +##### `apollo.router.operations.batching` + + + +mode + + + +Counter for the number of received batches. + +
+ +##### `apollo.router.operations.batching.size` + + + +mode + + + +Histogram for the size of received batches. + +
+ + +## Query batch formats + +### Request format + +A query batch is an array of operations. + +```graphql +[ +query MyFirstQuery { + me { + id + } +}, +query MySecondQuery { + me { + name + } +} +] +``` + +### Response format + +Responses are provided in JSON array, with the order of responses matching the order of operations in the query batch. + +```json +[ + {"data":{"me":{"id":"1"}}}, + {"data":{"me":{"name":"Ada Lovelace"}}} +] +``` + +## Error handling for query batching + +### Batch error + +If a batch of queries cannot be processed, the entire batch fails. + +For example, this batch request is invalid because it has two commas to separate the constituent queries: + +```graphql +[ +query MyFirstQuery { + me { + id + } +},, +query MySecondQuery { + me { + name + } +} +] +``` + +As a result, the router returns an invalid batch error: + +```json +{"errors": + [ + {"message":"Invalid GraphQL request","extensions":{"details":"failed to deserialize the request body into JSON: expected value at line 1 column 54","code":"INVALID_GRAPHQL_REQUEST"}} + ] +} +``` + +### Individual query error + +If a single query in a batch cannot be processed, this results in an individual error. + +For example, the query `MyFirstQuery` is accessing a field that doesn't exist, while the rest of the batch query is valid. + +```graphql +[ +query MyFirstQuery { + me { + thisfielddoesnotexist + } +}, +query MySecondQuery { + me { + name + } +} +] +``` + +As a result, an error is returned for the individual invalid query and the other (valid) query returns a response. + +```json +[ + {"errors": + [ + {"message":"cannot query field 'thisfielddoesnotexist' on type 'User'", + "extensions":{"type":"User","field":"thisfielddoesnotexist","code":"INVALID_FIELD"} + } + ] + }, + {"data":{"me":{"name":"Ada Lovelace"}}} +] +``` + +## Known limitations + +### Unsupported query modes + +When batching is enabled, any batch operation that results in a stream of responses is unsupported, including: +- [`@defer`](/graphos/operations/defer/) +- [subscriptions](/graphos/operations/subscriptions/)