From ca1d1ef61f2b713f4683dcb1c97e5d78c090d51b Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Tue, 16 Apr 2024 12:20:48 +0100 Subject: [PATCH] Subgraph support for query batching (#4661) This project is an extension of the existing work to support [client side batching in the router](https://github.com/apollographql/router/issues/126). The current implementation is experimental and is publicly [documented](https://www.apollographql.com/docs/router/executing-operations/query-batching/). The additional work to enable batching requests to subgraphs is captured in [this issue](https://github.com/apollographql/router/issues/2002). Currently the concept of a batch is preserved until the end of the RouterRequest processing. At this point we convert each batch request item into a separate SupergraphRequest. These are then planned and executed concurrently within the router and re-assembled into a batch when they complete. It's important to note that, with this implementation, the concept of a batch, from the perspective of an executing router, now disappears and each request is planned and executed separately. This extension will modify the router so that the concept of a batch is preserved, at least outwardly, so that multiple subgraph requests are "batched" (in exactly the same format as a client batch request) for onward transmission to subgraphs. The goal of this work is to provide an optimisation by reducing the number of round-trips to a subgraph from the router. Additionally, the work will address an [unresolved issue](https://github.com/apollographql/router/issues/4019) from the existing experimental implementation and promote the existing implementation from experimental to fully supported. Fixes #2002 --- **Review Guidance** This is a fairly big PR, so I've written these notes to help make the review more approachable. 1. The most important files to review are (in order): - [.changesets/feat_garypen_2002_subgraph_batching.md](https://github.com/apollographql/router/pull/4661/files#diff-6376c91cfdd47332a662c760ac849bb5449a1b6df6891b30b72b43f041bd836f) - [docs/source/executing-operations/query-batching.mdx](https://github.com/apollographql/router/pull/4661/files#diff-617468db3057857f71c387eaa0d1a6161e3c1b8bf9fcb2de6fc6eafedc147277) - [apollo-router/src/services/router/service.rs](https://github.com/apollographql/router/pull/4661/files#diff-544579a213fda1bff6313834d30fe1746a8a28ffd7c0d6dfa1081fa36a487355) - [apollo-router/src/services/supergraph/service.rs](https://github.com/apollographql/router/pull/4661/files#diff-5d72a88a68962a5926fb5bb115ea3efc186904612f74e697d72e3f009669c733) - [apollo-router/src/query_planner/plan.rs](https://github.com/apollographql/router/pull/4661/files#diff-21a82d277d12e8f21b6b71398d62e95303a117130cc4a27510b85ebfceeb8208) - [apollo-router/src/services/subgraph_service.rs](https://github.com/apollographql/router/pull/4661/files#diff-6ef5a208ca8622f30eef88f75c18566e0304d59856b66293dcd6811555e6382e) - [apollo-router/src/batching.rs](https://github.com/apollographql/router/pull/4661/files#diff-3e884074ecad8176341159a2382aa81c49d74b851894b8ade9fa4718c434dec6) First read the documentation. Hopefully that will make clear how this feature works. I've picked these files as being most important (and ordered them for review) because: router service => This is where we spot incoming batches and create context `BatchQuery` items to manage them through the router. We also re-assemble them on the way back and identify any batches which may need to be cancelled. supergraph service => Here we pick up the information about how many fetches we believe each `BatchQuery` will need to make. plan => The new `query_hashes()` does this fetch identification for us. This is the most important function in this feature. subgraph service => Here's is where we intercept the calls to subgraphs and park threads to wait for batch execution to be performed. We do a lot of work here, so this is where most of the intrusive changes are: assembling and dis-assembling batches and managing the co-ordination between a number of parked tasks. batching => This is the implementation of batch co-ordination. Each batch has a task which manages a variety of channels to facilitate communication between the incoming batches, waiting tasks and outgoing (to subgraph) batches. I'm suggesting reading this *after* reading through the service changes because it should mainly just be implementation details and you will be able to follow what is happening without knowing all this detail initially. Once you understand the changes to the services, you will need to read this code. Feel free to peek ahead though if that's how you like to review stuff. 2. There are still a couple of TODOs which will be resolved early next week. They are both related to how we handle context cloning, so a decision is still pending there. Obviously all the files need to be reviewed, but the remaining files should be fairly mechanical/straight-forward. --- **Checklist** Complete the checklist (and note appropriate exceptions) before the PR is marked ready-for-review. - [x] Changes are compatible[^1] - [x] Documentation[^2] completed - [x] Performance impact assessed and acceptable - Tests added and passing[^3] - [x] Unit Tests - [x] Integration Tests - ~[ ] Manual Tests~ **Exceptions** *Note any exceptions here* **Notes** [^1]: It may be appropriate to bring upcoming changes to the attention of other (impacted) groups. Please endeavour to do this before seeking PR approval. The mechanism for doing this will vary considerably, so use your judgement as to how and when to do this. [^2]: Configuration is an important part of many changes. Where applicable please try to document configuration examples. [^3]: Tick whichever testing boxes are applicable. If you are adding Manual Tests, please document the manual testing (extensively) in the Exceptions. --------- Co-authored-by: Nicholas Cioli Co-authored-by: Edward Huang --- .../feat_garypen_2002_subgraph_batching.md | 41 + apollo-router/feature_discussions.json | 5 +- apollo-router/src/batching.rs | 712 ++++++++++++ apollo-router/src/configuration/metrics.rs | 2 +- .../migrations/0023-batching.yaml | 5 + apollo-router/src/configuration/mod.rs | 52 +- ...nfiguration__tests__schema_generation.snap | 104 +- ...grade_old_configuration@batching.yaml.snap | 8 + .../testdata/metrics/batching.router.yaml | 2 +- .../testdata/migrations/batching.yaml | 3 + apollo-router/src/configuration/tests.rs | 126 ++ apollo-router/src/error.rs | 42 +- apollo-router/src/json_ext.rs | 9 + apollo-router/src/lib.rs | 1 + .../plugins/traffic_shaping/deduplication.rs | 13 + apollo-router/src/query_planner/plan.rs | 61 + apollo-router/src/request.rs | 4 +- apollo-router/src/response.rs | 8 +- apollo-router/src/services/router/service.rs | 179 ++- apollo-router/src/services/router/tests.rs | 8 +- .../src/services/subgraph_service.rs | 690 +++++++++-- .../src/services/supergraph/service.rs | 21 +- .../src/uplink/license_enforcement.rs | 4 + .../fixtures/apollo_reports_batch.router.yaml | 4 +- .../fixtures/batching/all_enabled.router.yaml | 11 + .../fixtures/batching/block_request.rhai | 10 + .../fixtures/batching/coprocessor.router.yaml | 19 + .../fixtures/batching/rhai_script.router.yaml | 15 + .../tests/fixtures/batching/schema.graphql | 56 + .../batching/short_timeouts.router.yaml | 14 + apollo-router/tests/integration/batching.rs | 1026 +++++++++++++++++ apollo-router/tests/integration/mod.rs | 1 + ...n__lifecycle__cli_config_experimental.snap | 1 - docs/source/config.json | 2 +- docs/source/configuration/traffic-shaping.mdx | 6 +- .../executing-operations/query-batching.mdx | 169 ++- 36 files changed, 3216 insertions(+), 218 deletions(-) create mode 100644 .changesets/feat_garypen_2002_subgraph_batching.md create mode 100644 apollo-router/src/batching.rs create mode 100644 apollo-router/src/configuration/migrations/0023-batching.yaml create mode 100644 apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__upgrade_old_configuration@batching.yaml.snap create mode 100644 apollo-router/src/configuration/testdata/migrations/batching.yaml create mode 100644 apollo-router/tests/fixtures/batching/all_enabled.router.yaml create mode 100644 apollo-router/tests/fixtures/batching/block_request.rhai create mode 100644 apollo-router/tests/fixtures/batching/coprocessor.router.yaml create mode 100644 apollo-router/tests/fixtures/batching/rhai_script.router.yaml create mode 100644 apollo-router/tests/fixtures/batching/schema.graphql create mode 100644 apollo-router/tests/fixtures/batching/short_timeouts.router.yaml create mode 100644 apollo-router/tests/integration/batching.rs diff --git a/.changesets/feat_garypen_2002_subgraph_batching.md b/.changesets/feat_garypen_2002_subgraph_batching.md new file mode 100644 index 0000000000..7a275093b9 --- /dev/null +++ b/.changesets/feat_garypen_2002_subgraph_batching.md @@ -0,0 +1,41 @@ +### Subgraph support for query batching ([Issue #2002](https://github.com/apollographql/router/issues/2002)) + +As an extension to the ongoing work to support [client-side query batching in the router](https://github.com/apollographql/router/issues/126), the router now supports batching of subgraph requests. Each subgraph batch request retains the same external format as a client batch request. This optimization reduces the number of round-trip requests from the router to subgraphs. + +Also, batching in the router is now a generally available feature: the `experimental_batching` router configuration option has been deprecated and is replaced by the `batching` option. + +Previously, the router preserved the concept of a batch until a `RouterRequest` finished processing. From that point, the router converted each batch request item into a separate `SupergraphRequest`, and the router planned and executed those requests concurrently within the router, then reassembled them into a batch of `RouterResponse` to return to the client. Now with the implementation in this release, the concept of a batch is extended so that batches are issued to configured subgraphs (all or named). Each batch request item is planned and executed separately, but the queries issued to subgraphs are optimally assembled into batches which observe the query constraints of the various batch items. + +To configure subgraph batching, you can enable `batching.subgraph.all` for all subgraphs. You can also enable batching per subgraph with `batching.subgraph.subgraphs.*`. For example: + +```yaml +batching: + enabled: true + mode: batch_http_link + subgraph: + # Enable batching on all subgraphs + all: + enabled: true +``` + +```yaml +batching: + enabled: true + mode: batch_http_link + subgraph: + # Disable batching on all subgraphs + all: + enabled: false + # Configure(over-ride) batching support per subgraph + subgraphs: + subgraph_1: + enabled: true + subgraph_2: + enabled: true +``` + +Note: `all` may be over-ridden by `subgraphs`. This applies in general for all router subgraph configuration options. + +To learn more, see [query batching in Apollo docs](https://www.apollographql.com/docs/router/executing-operations/query-batching/). + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/4661 diff --git a/apollo-router/feature_discussions.json b/apollo-router/feature_discussions.json index 59f5a84608..446162650a 100644 --- a/apollo-router/feature_discussions.json +++ b/apollo-router/feature_discussions.json @@ -2,10 +2,9 @@ "experimental": { "experimental_retry": "https://github.com/apollographql/router/discussions/2241", "experimental_response_trace_id": "https://github.com/apollographql/router/discussions/2147", - "experimental_when_header": "https://github.com/apollographql/router/discussions/1961", - "experimental_batching": "https://github.com/apollographql/router/discussions/3840" + "experimental_when_header": "https://github.com/apollographql/router/discussions/1961" }, "preview": { "preview_entity_cache": "https://github.com/apollographql/router/discussions/4592" } -} \ No newline at end of file +} diff --git a/apollo-router/src/batching.rs b/apollo-router/src/batching.rs new file mode 100644 index 0000000000..79a7e29f83 --- /dev/null +++ b/apollo-router/src/batching.rs @@ -0,0 +1,712 @@ +//! Various utility functions and core structures used to implement batching support within +//! the router. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::fmt; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use hyper::Body; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::Context as otelContext; +use parking_lot::Mutex as PMutex; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tower::BoxError; +use tracing::Instrument; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::error::FetchError; +use crate::error::SubgraphBatchingError; +use crate::graphql; +use crate::query_planner::fetch::QueryHash; +use crate::services::http::HttpClientServiceFactory; +use crate::services::process_batches; +use crate::services::SubgraphRequest; +use crate::services::SubgraphResponse; +use crate::Context; + +/// A query that is part of a batch. +/// Note: It's ok to make transient clones of this struct, but *do not* store clones anywhere apart +/// from the single copy in the extensions. The batching co-ordinator relies on the fact that all +/// senders are dropped to know when to finish processing. +#[derive(Clone, Debug)] +pub(crate) struct BatchQuery { + /// The index of this query relative to the entire batch + index: usize, + + /// A channel sender for sending updates to the entire batch + sender: Arc>>>, + + /// How many more progress updates are we expecting to send? + remaining: Arc, + + /// Batch to which this BatchQuery belongs + batch: Arc, +} + +impl fmt::Display for BatchQuery { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "index: {}, ", self.index)?; + write!(f, "remaining: {}, ", self.remaining.load(Ordering::Acquire))?; + write!(f, "sender: {:?}, ", self.sender)?; + write!(f, "batch: {:?}, ", self.batch)?; + Ok(()) + } +} + +impl BatchQuery { + /// Is this BatchQuery finished? + pub(crate) fn finished(&self) -> bool { + self.remaining.load(Ordering::Acquire) == 0 + } + + /// Inform the batch of query hashes representing fetches needed by this element of the batch query + pub(crate) async fn set_query_hashes( + &self, + query_hashes: Vec>, + ) -> Result<(), BoxError> { + self.remaining.store(query_hashes.len(), Ordering::Release); + + self.sender + .lock() + .await + .as_ref() + .ok_or(SubgraphBatchingError::SenderUnavailable)? + .send(BatchHandlerMessage::Begin { + index: self.index, + query_hashes, + }) + .await?; + Ok(()) + } + + /// Signal to the batch handler that this specific batch query has made some progress. + /// + /// The returned channel can be awaited to receive the GraphQL response, when ready. + pub(crate) async fn signal_progress( + &self, + client_factory: HttpClientServiceFactory, + request: SubgraphRequest, + gql_request: graphql::Request, + ) -> Result>, BoxError> { + // Create a receiver for this query so that it can eventually get the request meant for it + let (tx, rx) = oneshot::channel(); + + tracing::debug!( + "index: {}, REMAINING: {}", + self.index, + self.remaining.load(Ordering::Acquire) + ); + self.sender + .lock() + .await + .as_ref() + .ok_or(SubgraphBatchingError::SenderUnavailable)? + .send(BatchHandlerMessage::Progress { + index: self.index, + client_factory, + request, + gql_request, + response_sender: tx, + span_context: Span::current().context(), + }) + .await?; + + if !self.finished() { + self.remaining.fetch_sub(1, Ordering::AcqRel); + } + + // May now be finished + if self.finished() { + let mut sender = self.sender.lock().await; + *sender = None; + } + + Ok(rx) + } + + /// Signal to the batch handler that this specific batch query is cancelled + pub(crate) async fn signal_cancelled(&self, reason: String) -> Result<(), BoxError> { + self.sender + .lock() + .await + .as_ref() + .ok_or(SubgraphBatchingError::SenderUnavailable)? + .send(BatchHandlerMessage::Cancel { + index: self.index, + reason, + }) + .await?; + + if !self.finished() { + self.remaining.fetch_sub(1, Ordering::AcqRel); + } + + // May now be finished + if self.finished() { + let mut sender = self.sender.lock().await; + *sender = None; + } + + Ok(()) + } +} + +// #[derive(Debug)] +enum BatchHandlerMessage { + /// Cancel one of the batch items + Cancel { index: usize, reason: String }, + + /// A query has reached the subgraph service and we should update its state + Progress { + index: usize, + client_factory: HttpClientServiceFactory, + request: SubgraphRequest, + gql_request: graphql::Request, + response_sender: oneshot::Sender>, + span_context: otelContext, + }, + + /// A query has passed query planning and knows how many fetches are needed + /// to complete. + Begin { + index: usize, + query_hashes: Vec>, + }, +} + +/// Collection of info needed to resolve a batch query +pub(crate) struct BatchQueryInfo { + /// The owning subgraph request + request: SubgraphRequest, + + /// The GraphQL request tied to this subgraph request + gql_request: graphql::Request, + + /// Notifier for the subgraph service handler + /// + /// Note: This must be used or else the subgraph request will time out + sender: oneshot::Sender>, +} + +// TODO: Do we want to generate a UUID for a batch for observability reasons? +// TODO: Do we want to track the size of a batch? +#[derive(Debug)] +pub(crate) struct Batch { + /// A sender channel to communicate with the batching handler + senders: PMutex>>>, + + /// The spawned batching handler task handle + /// + /// Note: We keep this as a failsafe. If the task doesn't terminate _before_ the batch is + /// dropped, then we will abort() the task on drop. + spawn_handle: JoinHandle>, + + /// What is the size (number of input operations) of the batch? + #[allow(dead_code)] + size: usize, +} + +impl Batch { + /// Creates a new batch, spawning an async task for handling updates to the + /// batch lifecycle. + pub(crate) fn spawn_handler(size: usize) -> Self { + tracing::debug!("New batch created with size {size}"); + + // Create the message channel pair for sending update events to the spawned task + let (spawn_tx, mut rx) = mpsc::channel(size); + + // Populate Senders + let mut senders = vec![]; + + for _ in 0..size { + senders.push(Some(spawn_tx.clone())); + } + + let spawn_handle = tokio::spawn(async move { + /// Helper struct for keeping track of the state of each individual BatchQuery + /// + #[derive(Debug)] + struct BatchQueryState { + registered: HashSet>, + committed: HashSet>, + cancelled: HashSet>, + } + + impl BatchQueryState { + // We are ready when everything we registered is in either cancelled or + // committed. + fn is_ready(&self) -> bool { + self.registered.difference(&self.committed.union(&self.cancelled).cloned().collect()).collect::>().is_empty() + } + } + + // Progressively track the state of the various batch fetches that we expect to see. Keys are batch + // indices. + let mut batch_state: HashMap = HashMap::with_capacity(size); + + // We also need to keep track of all requests we need to make and their send handles + let mut requests: Vec> = + Vec::from_iter((0..size).map(|_| Vec::new())); + + let mut master_client_factory = None; + tracing::debug!("Batch about to await messages..."); + // Start handling messages from various portions of the request lifecycle + // When recv() returns None, we want to stop processing messages + while let Some(msg) = rx.recv().await { + match msg { + BatchHandlerMessage::Cancel { index, reason } => { + // Log the reason for cancelling, update the state + tracing::debug!("Cancelling index: {index}, {reason}"); + + if let Some(state) = batch_state.get_mut(&index) { + // Short-circuit any requests that are waiting for this cancelled request to complete. + let cancelled_requests = std::mem::take(&mut requests[index]); + for BatchQueryInfo { + request, sender, .. + } in cancelled_requests + { + let subgraph_name = request.subgraph_name.ok_or(SubgraphBatchingError::MissingSubgraphName)?; + if let Err(log_error) = sender.send(Err(Box::new(FetchError::SubrequestBatchingError { + service: subgraph_name.clone(), + reason: format!("request cancelled: {reason}"), + }))) { + tracing::error!(service=subgraph_name, error=?log_error, "failed to notify waiter that request is cancelled"); + } + } + + // Clear out everything that has committed, now that they are cancelled, and + // mark everything as having been cancelled. + state.committed.clear(); + state.cancelled = state.registered.clone(); + } + } + + BatchHandlerMessage::Begin { + index, + query_hashes, + } => { + tracing::debug!("Beginning batch for index {index} with {query_hashes:?}"); + + batch_state.insert( + index, + BatchQueryState { + cancelled: HashSet::with_capacity(query_hashes.len()), + committed: HashSet::with_capacity(query_hashes.len()), + registered: HashSet::from_iter(query_hashes), + }, + ); + } + + BatchHandlerMessage::Progress { + index, + client_factory, + request, + gql_request, + response_sender, + span_context, + } => { + // Progress the index + + tracing::debug!("Progress index: {index}"); + + if let Some(state) = batch_state.get_mut(&index) { + state.committed.insert(request.query_hash.clone()); + } + + if master_client_factory.is_none() { + master_client_factory = Some(client_factory); + } + Span::current().add_link(span_context.span().span_context().clone()); + requests[index].push(BatchQueryInfo { + request, + gql_request, + sender: response_sender, + }) + } + } + } + + // Make sure that we are actually ready and haven't forgotten to update something somewhere + if batch_state.values().any(|f| !f.is_ready()) { + tracing::error!("All senders for the batch have dropped before reaching the ready state: {batch_state:#?}"); + // There's not much else we can do, so perform an early return + return Err(SubgraphBatchingError::ProcessingFailed("batch senders not ready when required".to_string()).into()); + } + + tracing::debug!("Assembling {size} requests into batches"); + + // We now have a bunch of requests which are organised by index and we would like to + // convert them into a bunch of requests organised by service... + + let all_in_one: Vec<_> = requests.into_iter().flatten().collect(); + + // Now build up a Service oriented view to use in constructing our batches + let mut svc_map: HashMap> = HashMap::new(); + for BatchQueryInfo { + request: sg_request, + gql_request, + sender: tx, + } in all_in_one + { + let subgraph_name = sg_request.subgraph_name.clone().ok_or(SubgraphBatchingError::MissingSubgraphName)?; + let value = svc_map + .entry( + subgraph_name, + ) + .or_default(); + value.push(BatchQueryInfo { + request: sg_request, + gql_request, + sender: tx, + }); + } + + // If we don't have a master_client_factory, we can't do anything. + if let Some(client_factory) = master_client_factory { + process_batches(client_factory, svc_map).await?; + } + Ok(()) + }.instrument(tracing::info_span!("batch_request", size))); + + Self { + senders: PMutex::new(senders), + spawn_handle, + size, + } + } + + /// Create a batch query for a specific index in this batch + /// + /// This function may fail if the index doesn't exist or has already been taken + pub(crate) fn query_for_index( + batch: Arc, + index: usize, + ) -> Result { + let mut guard = batch.senders.lock(); + // It's a serious error if we try to get a query at an index which doesn't exist or which has already been taken + if index >= guard.len() { + return Err(SubgraphBatchingError::ProcessingFailed(format!( + "tried to retriever sender for index: {index} which does not exist" + ))); + } + let opt_sender = std::mem::take(&mut guard[index]); + if opt_sender.is_none() { + return Err(SubgraphBatchingError::ProcessingFailed(format!( + "tried to retriever sender for index: {index} which has already been taken" + ))); + } + drop(guard); + Ok(BatchQuery { + index, + sender: Arc::new(Mutex::new(opt_sender)), + remaining: Arc::new(AtomicUsize::new(0)), + batch, + }) + } +} + +impl Drop for Batch { + fn drop(&mut self) { + // Failsafe: make sure that we kill the background task if the batch itself is dropped + self.spawn_handle.abort(); + } +} + +// Assemble a single batch request to a subgraph +pub(crate) async fn assemble_batch( + requests: Vec, +) -> Result< + ( + String, + Context, + http::Request, + Vec>>, + ), + BoxError, +> { + // Extract the collection of parts from the requests + let (txs, request_pairs): (Vec<_>, Vec<_>) = requests + .into_iter() + .map(|r| (r.sender, (r.request, r.gql_request))) + .unzip(); + let (requests, gql_requests): (Vec<_>, Vec<_>) = request_pairs.into_iter().unzip(); + + // Construct the actual byte body of the batched request + let bytes = hyper::body::to_bytes(serde_json::to_string(&gql_requests)?).await?; + + // Grab the common info from the first request + let context = requests + .first() + .ok_or(SubgraphBatchingError::RequestsIsEmpty)? + .context + .clone(); + let first_request = requests + .into_iter() + .next() + .ok_or(SubgraphBatchingError::RequestsIsEmpty)? + .subgraph_request; + let operation_name = first_request + .body() + .operation_name + .clone() + .unwrap_or_default(); + let (parts, _) = first_request.into_parts(); + + // Generate the final request and pass it up + let request = http::Request::from_parts(parts, Body::from(bytes)); + Ok((operation_name, context, request, txs)) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use hyper::body::to_bytes; + use tokio::sync::oneshot; + + use super::assemble_batch; + use super::Batch; + use super::BatchQueryInfo; + use crate::graphql; + use crate::plugins::traffic_shaping::Http2Config; + use crate::query_planner::fetch::QueryHash; + use crate::services::http::HttpClientServiceFactory; + use crate::services::SubgraphRequest; + use crate::services::SubgraphResponse; + use crate::Configuration; + use crate::Context; + + #[tokio::test(flavor = "multi_thread")] + async fn it_assembles_batch() { + // Assemble a list of requests for testing + let (receivers, requests): (Vec<_>, Vec<_>) = (0..2) + .map(|index| { + let (tx, rx) = oneshot::channel(); + let gql_request = graphql::Request::fake_builder() + .operation_name(format!("batch_test_{index}")) + .query(format!("query batch_test {{ slot{index} }}")) + .build(); + + ( + rx, + BatchQueryInfo { + request: SubgraphRequest::fake_builder() + .subgraph_request( + http::Request::builder().body(gql_request.clone()).unwrap(), + ) + .subgraph_name(format!("slot{index}")) + .build(), + gql_request, + sender: tx, + }, + ) + }) + .unzip(); + + // Assemble them + let (op_name, _context, request, txs) = assemble_batch(requests) + .await + .expect("it can assemble a batch"); + + // Make sure that the name of the entire batch is that of the first + assert_eq!(op_name, "batch_test_0"); + + // We should see the aggregation of all of the requests + let actual: Vec = serde_json::from_str( + &String::from_utf8(to_bytes(request.into_body()).await.unwrap().to_vec()).unwrap(), + ) + .unwrap(); + + let expected: Vec<_> = (0..2) + .map(|index| { + graphql::Request::fake_builder() + .operation_name(format!("batch_test_{index}")) + .query(format!("query batch_test {{ slot{index} }}")) + .build() + }) + .collect(); + assert_eq!(actual, expected); + + // We should also have all of the correct senders and they should be linked to the correct waiter + // Note: We reverse the senders since they should be in reverse order when assembled + assert_eq!(txs.len(), receivers.len()); + for (index, (tx, rx)) in Iterator::zip(txs.into_iter(), receivers).enumerate() { + let data = serde_json_bytes::json!({ + "data": { + format!("slot{index}"): "valid" + } + }); + let response = SubgraphResponse { + response: http::Response::builder() + .body(graphql::Response::builder().data(data.clone()).build()) + .unwrap(), + context: Context::new(), + }; + + tx.send(Ok(response)).unwrap(); + + // We want to make sure that we don't hang the test if we don't get the correct message + let received = tokio::time::timeout(Duration::from_millis(10), rx) + .await + .unwrap() + .unwrap() + .unwrap(); + + assert_eq!(received.response.into_body().data, Some(data)); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn it_rejects_index_out_of_bounds() { + let batch = Arc::new(Batch::spawn_handler(2)); + + assert!(Batch::query_for_index(batch.clone(), 2).is_err()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn it_rejects_duplicated_index_get() { + let batch = Arc::new(Batch::spawn_handler(2)); + + assert!(Batch::query_for_index(batch.clone(), 0).is_ok()); + assert!(Batch::query_for_index(batch.clone(), 0).is_err()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn it_limits_the_number_of_cancelled_sends() { + let batch = Arc::new(Batch::spawn_handler(2)); + + let bq = Batch::query_for_index(batch.clone(), 0).expect("its a valid index"); + + assert!(bq + .set_query_hashes(vec![Arc::new(QueryHash::default())]) + .await + .is_ok()); + assert!(!bq.finished()); + assert!(bq.signal_cancelled("why not?".to_string()).await.is_ok()); + assert!(bq.finished()); + assert!(bq + .signal_cancelled("only once though".to_string()) + .await + .is_err()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn it_limits_the_number_of_progressed_sends() { + let batch = Arc::new(Batch::spawn_handler(2)); + + let bq = Batch::query_for_index(batch.clone(), 0).expect("its a valid index"); + + let factory = HttpClientServiceFactory::from_config( + "testbatch", + &Configuration::default(), + Http2Config::Disable, + ); + let request = SubgraphRequest::fake_builder() + .subgraph_request( + http::Request::builder() + .body(graphql::Request::default()) + .unwrap(), + ) + .subgraph_name("whatever".to_string()) + .build(); + assert!(bq + .set_query_hashes(vec![Arc::new(QueryHash::default())]) + .await + .is_ok()); + assert!(!bq.finished()); + assert!(bq + .signal_progress( + factory.clone(), + request.clone(), + graphql::Request::default() + ) + .await + .is_ok()); + assert!(bq.finished()); + assert!(bq + .signal_progress(factory, request, graphql::Request::default()) + .await + .is_err()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn it_limits_the_number_of_mixed_sends() { + let batch = Arc::new(Batch::spawn_handler(2)); + + let bq = Batch::query_for_index(batch.clone(), 0).expect("its a valid index"); + + let factory = HttpClientServiceFactory::from_config( + "testbatch", + &Configuration::default(), + Http2Config::Disable, + ); + let request = SubgraphRequest::fake_builder() + .subgraph_request( + http::Request::builder() + .body(graphql::Request::default()) + .unwrap(), + ) + .subgraph_name("whatever".to_string()) + .build(); + assert!(bq + .set_query_hashes(vec![Arc::new(QueryHash::default())]) + .await + .is_ok()); + assert!(!bq.finished()); + assert!(bq + .signal_progress(factory, request, graphql::Request::default()) + .await + .is_ok()); + assert!(bq.finished()); + assert!(bq + .signal_cancelled("only once though".to_string()) + .await + .is_err()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn it_limits_the_number_of_mixed_sends_two_query_hashes() { + let batch = Arc::new(Batch::spawn_handler(2)); + + let bq = Batch::query_for_index(batch.clone(), 0).expect("its a valid index"); + + let factory = HttpClientServiceFactory::from_config( + "testbatch", + &Configuration::default(), + Http2Config::Disable, + ); + let request = SubgraphRequest::fake_builder() + .subgraph_request( + http::Request::builder() + .body(graphql::Request::default()) + .unwrap(), + ) + .subgraph_name("whatever".to_string()) + .build(); + let qh = Arc::new(QueryHash::default()); + assert!(bq.set_query_hashes(vec![qh.clone(), qh]).await.is_ok()); + assert!(!bq.finished()); + assert!(bq + .signal_progress(factory, request, graphql::Request::default()) + .await + .is_ok()); + assert!(!bq.finished()); + assert!(bq + .signal_cancelled("only twice though".to_string()) + .await + .is_ok()); + assert!(bq.finished()); + assert!(bq + .signal_cancelled("only twice though".to_string()) + .await + .is_err()); + } +} diff --git a/apollo-router/src/configuration/metrics.rs b/apollo-router/src/configuration/metrics.rs index fa9a4d4a67..9e3f894970 100644 --- a/apollo-router/src/configuration/metrics.rs +++ b/apollo-router/src/configuration/metrics.rs @@ -336,7 +336,7 @@ impl InstrumentData { populate_config_instrument!( apollo.router.config.batching, - "$.experimental_batching[?(@.enabled == true)]", + "$.batching[?(@.enabled == true)]", opt.mode, "$.mode" ); diff --git a/apollo-router/src/configuration/migrations/0023-batching.yaml b/apollo-router/src/configuration/migrations/0023-batching.yaml new file mode 100644 index 0000000000..7457467524 --- /dev/null +++ b/apollo-router/src/configuration/migrations/0023-batching.yaml @@ -0,0 +1,5 @@ +description: Batching is no longer experimental +actions: + - type: move + from: experimental_batching + to: batching diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index ae5674b665..561b62e998 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -190,7 +190,7 @@ pub struct Configuration { /// Batching configuration. #[serde(default)] - pub(crate) experimental_batching: Batching, + pub(crate) batching: Batching, } impl PartialEq for Configuration { @@ -254,7 +254,7 @@ impl<'de> serde::Deserialize<'de> for Configuration { uplink: UplinkConfig, limits: Limits, experimental_chaos: Chaos, - experimental_batching: Batching, + batching: Batching, experimental_apollo_metrics_generation_mode: ApolloMetricsGenerationMode, } let ad_hoc: AdHocConfiguration = serde::Deserialize::deserialize(deserializer)?; @@ -273,7 +273,7 @@ impl<'de> serde::Deserialize<'de> for Configuration { .operation_limits(ad_hoc.limits) .chaos(ad_hoc.experimental_chaos) .uplink(ad_hoc.uplink) - .experimental_batching(ad_hoc.experimental_batching) + .batching(ad_hoc.batching) .experimental_apollo_metrics_generation_mode( ad_hoc.experimental_apollo_metrics_generation_mode, ) @@ -313,8 +313,8 @@ impl Configuration { chaos: Option, uplink: Option, experimental_api_schema_generation_mode: Option, + batching: Option, experimental_apollo_metrics_generation_mode: Option, - experimental_batching: Option, ) -> Result { #[cfg(not(test))] let notify_queue_cap = match apollo_plugins.get(APOLLO_SUBSCRIPTION_PLUGIN_NAME) { @@ -350,7 +350,7 @@ impl Configuration { }, tls: tls.unwrap_or_default(), uplink, - experimental_batching: experimental_batching.unwrap_or_default(), + batching: batching.unwrap_or_default(), #[cfg(test)] notify: notify.unwrap_or_default(), #[cfg(not(test))] @@ -388,7 +388,7 @@ impl Configuration { operation_limits: Option, chaos: Option, uplink: Option, - experimental_batching: Option, + batching: Option, experimental_api_schema_generation_mode: Option, experimental_apollo_metrics_generation_mode: Option, ) -> Result { @@ -416,7 +416,7 @@ impl Configuration { apq: apq.unwrap_or_default(), persisted_queries: persisted_query.unwrap_or_default(), uplink, - experimental_batching: experimental_batching.unwrap_or_default(), + batching: batching.unwrap_or_default(), }; configuration.validate() @@ -1573,4 +1573,42 @@ pub(crate) struct Batching { /// Batching mode pub(crate) mode: BatchingMode, + + /// Subgraph options for batching + pub(crate) subgraph: Option>, +} + +/// Common options for configuring subgraph batching +#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)] +pub(crate) struct CommonBatchingConfig { + /// Whether this batching config should be enabled + pub(crate) enabled: bool, +} + +impl Batching { + // Check if we should enable batching for a particular subgraph (service_name) + pub(crate) fn batch_include(&self, service_name: &str) -> bool { + match &self.subgraph { + Some(subgraph_batching_config) => { + // Override by checking if all is enabled + if subgraph_batching_config.all.enabled { + // If it is, require: + // - no subgraph entry OR + // - an enabled subgraph entry + subgraph_batching_config + .subgraphs + .get(service_name) + .map_or(true, |x| x.enabled) + } else { + // If it isn't, require: + // - an enabled subgraph entry + subgraph_batching_config + .subgraphs + .get(service_name) + .is_some_and(|x| x.enabled) + } + } + None => false, + } + } } diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 687f514a2a..3d3d7fd527 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -742,6 +742,79 @@ expression: "&schema" } } }, + "batching": { + "description": "Batching configuration.", + "default": { + "enabled": false, + "mode": "batch_http_link", + "subgraph": null + }, + "type": "object", + "required": [ + "mode" + ], + "properties": { + "enabled": { + "description": "Activates Batching (disabled by default)", + "default": false, + "type": "boolean" + }, + "mode": { + "description": "Batching mode", + "oneOf": [ + { + "description": "batch_http_link", + "type": "string", + "enum": [ + "batch_http_link" + ] + } + ] + }, + "subgraph": { + "description": "Subgraph options for batching", + "type": "object", + "properties": { + "all": { + "description": "options applying to all subgraphs", + "default": { + "enabled": false + }, + "type": "object", + "required": [ + "enabled" + ], + "properties": { + "enabled": { + "description": "Whether this batching config should be enabled", + "type": "boolean" + } + } + }, + "subgraphs": { + "description": "per subgraph options", + "default": {}, + "type": "object", + "additionalProperties": { + "description": "Common options for configuring subgraph batching", + "type": "object", + "required": [ + "enabled" + ], + "properties": { + "enabled": { + "description": "Whether this batching config should be enabled", + "type": "boolean" + } + } + } + } + }, + "nullable": true + } + }, + "additionalProperties": false + }, "coprocessor": { "description": "Configures the externalization plugin", "type": "object", @@ -1375,37 +1448,6 @@ expression: "&schema" } ] }, - "experimental_batching": { - "description": "Batching configuration.", - "default": { - "enabled": false, - "mode": "batch_http_link" - }, - "type": "object", - "required": [ - "mode" - ], - "properties": { - "enabled": { - "description": "Activates Batching (disabled by default)", - "default": false, - "type": "boolean" - }, - "mode": { - "description": "Batching mode", - "oneOf": [ - { - "description": "batch_http_link", - "type": "string", - "enum": [ - "batch_http_link" - ] - } - ] - } - }, - "additionalProperties": false - }, "experimental_chaos": { "description": "Configuration for chaos testing, trying to reproduce bugs that require uncommon conditions. You probably don’t want this in production!", "default": { diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__upgrade_old_configuration@batching.yaml.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__upgrade_old_configuration@batching.yaml.snap new file mode 100644 index 0000000000..daec7b3f14 --- /dev/null +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__upgrade_old_configuration@batching.yaml.snap @@ -0,0 +1,8 @@ +--- +source: apollo-router/src/configuration/tests.rs +expression: new_config +--- +--- +batching: + enabled: true + mode: batch_http_link diff --git a/apollo-router/src/configuration/testdata/metrics/batching.router.yaml b/apollo-router/src/configuration/testdata/metrics/batching.router.yaml index c177d3f45e..169f3824a9 100644 --- a/apollo-router/src/configuration/testdata/metrics/batching.router.yaml +++ b/apollo-router/src/configuration/testdata/metrics/batching.router.yaml @@ -1,3 +1,3 @@ -experimental_batching: +batching: enabled: true mode: batch_http_link diff --git a/apollo-router/src/configuration/testdata/migrations/batching.yaml b/apollo-router/src/configuration/testdata/migrations/batching.yaml new file mode 100644 index 0000000000..c177d3f45e --- /dev/null +++ b/apollo-router/src/configuration/testdata/migrations/batching.yaml @@ -0,0 +1,3 @@ +experimental_batching: + enabled: true + mode: batch_http_link diff --git a/apollo-router/src/configuration/tests.rs b/apollo-router/src/configuration/tests.rs index f619a5018c..ffefd4ad54 100644 --- a/apollo-router/src/configuration/tests.rs +++ b/apollo-router/src/configuration/tests.rs @@ -968,6 +968,132 @@ fn it_adds_slash_to_custom_health_check_path_if_missing() { assert_eq!(&conf.health_check.path, "/healthz"); } +#[test] +fn it_processes_batching_subgraph_all_enabled_correctly() { + let json_config = json!({ + "enabled": true, + "mode": "batch_http_link", + "subgraph": { + "all": { + "enabled": true + } + } + }); + + let config: Batching = serde_json::from_value(json_config).unwrap(); + + assert!(config.batch_include("anything")); +} + +#[test] +fn it_processes_batching_subgraph_all_disabled_correctly() { + let json_config = json!({ + "enabled": true, + "mode": "batch_http_link", + "subgraph": { + "all": { + "enabled": false + } + } + }); + + let config: Batching = serde_json::from_value(json_config).unwrap(); + + assert!(!config.batch_include("anything")); +} + +#[test] +fn it_processes_batching_subgraph_accounts_enabled_correctly() { + let json_config = json!({ + "enabled": true, + "mode": "batch_http_link", + "subgraph": { + "all": { + "enabled": false + }, + "subgraphs": { + "accounts": { + "enabled": true + } + } + } + }); + + let config: Batching = serde_json::from_value(json_config).unwrap(); + + assert!(!config.batch_include("anything")); + assert!(config.batch_include("accounts")); +} + +#[test] +fn it_processes_batching_subgraph_accounts_disabled_correctly() { + let json_config = json!({ + "enabled": true, + "mode": "batch_http_link", + "subgraph": { + "all": { + "enabled": false + }, + "subgraphs": { + "accounts": { + "enabled": false + } + } + } + }); + + let config: Batching = serde_json::from_value(json_config).unwrap(); + + assert!(!config.batch_include("anything")); + assert!(!config.batch_include("accounts")); +} + +#[test] +fn it_processes_batching_subgraph_accounts_override_disabled_correctly() { + let json_config = json!({ + "enabled": true, + "mode": "batch_http_link", + "subgraph": { + "all": { + "enabled": true + }, + "subgraphs": { + "accounts": { + "enabled": false + } + } + } + }); + + let config: Batching = serde_json::from_value(json_config).unwrap(); + + assert!(config.batch_include("anything")); + assert!(!config.batch_include("accounts")); +} + +#[test] +fn it_processes_batching_subgraph_accounts_override_enabled_correctly() { + let json_config = json!({ + "enabled": true, + "mode": "batch_http_link", + "subgraph": { + "all": { + "enabled": false + }, + "subgraphs": { + "accounts": { + "enabled": true + } + } + } + }); + + let config: Batching = serde_json::from_value(json_config).unwrap(); + + assert!(!config.batch_include("anything")); + assert!(config.batch_include("accounts")); +} + fn has_field_level_serde_defaults(lines: &[&str], line_number: usize) -> bool { let serde_field_default = Regex::new( r#"^\s*#[\s\n]*\[serde\s*\((.*,)?\s*default\s*=\s*"[a-zA-Z0-9_:]+"\s*(,.*)?\)\s*\]\s*$"#, diff --git a/apollo-router/src/error.rs b/apollo-router/src/error.rs index c102059d8c..7fd226c12c 100644 --- a/apollo-router/src/error.rs +++ b/apollo-router/src/error.rs @@ -100,6 +100,15 @@ pub(crate) enum FetchError { /// could not find path: {reason} ExecutionPathNotFound { reason: String }, + + /// Batching error for '{service}': {reason} + SubrequestBatchingError { + /// The service for which batch processing failed. + service: String, + + /// The reason batch processing failed. + reason: String, + }, } impl FetchError { @@ -173,6 +182,7 @@ impl ErrorExtension for FetchError { FetchError::ExecutionPathNotFound { .. } => "EXECUTION_PATH_NOT_FOUND", FetchError::MalformedRequest { .. } => "MALFORMED_REQUEST", FetchError::MalformedResponse { .. } => "MALFORMED_RESPONSE", + FetchError::SubrequestBatchingError { .. } => "SUBREQUEST_BATCHING_ERROR", } .to_string() } @@ -191,16 +201,23 @@ impl From for FetchError { pub(crate) enum CacheResolverError { /// value retrieval failed: {0} RetrievalError(Arc), + /// batch processing failed: {0} + BatchingError(String), } impl IntoGraphQLErrors for CacheResolverError { fn into_graphql_errors(self) -> Result, Self> { - let CacheResolverError::RetrievalError(retrieval_error) = self; - retrieval_error - .deref() - .clone() - .into_graphql_errors() - .map_err(|_err| CacheResolverError::RetrievalError(retrieval_error)) + match self { + CacheResolverError::RetrievalError(retrieval_error) => retrieval_error + .deref() + .clone() + .into_graphql_errors() + .map_err(|_err| CacheResolverError::RetrievalError(retrieval_error)), + CacheResolverError::BatchingError(msg) => Ok(vec![Error::builder() + .message(msg) + .extension_code("BATCH_PROCESSING_FAILED") + .build()]), + } } } @@ -650,6 +667,19 @@ impl std::fmt::Display for ValidationErrors { } } +/// Error during subgraph batch processing +#[derive(Debug, Error, Display)] +pub(crate) enum SubgraphBatchingError { + /// Sender unavailable + SenderUnavailable, + /// Request does not have a subgraph name + MissingSubgraphName, + /// Requests is empty + RequestsIsEmpty, + /// Batch processing failed: {0} + ProcessingFailed(String), +} + #[cfg(test)] mod tests { use super::*; diff --git a/apollo-router/src/json_ext.rs b/apollo-router/src/json_ext.rs index 9967114741..b8cf588187 100644 --- a/apollo-router/src/json_ext.rs +++ b/apollo-router/src/json_ext.rs @@ -37,6 +37,15 @@ macro_rules! extract_key_value_from_object { }}; } +macro_rules! ensure_array { + ($value:expr) => {{ + match $value { + crate::json_ext::Value::Array(a) => Ok(a), + _ => Err("invalid type, expected an array"), + } + }}; +} + macro_rules! ensure_object { ($value:expr) => {{ match $value { diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index f10afe636a..4dc56b4ea9 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -52,6 +52,7 @@ pub(crate) mod metrics; mod apollo_studio_interop; pub(crate) mod axum_factory; +mod batching; mod cache; mod configuration; mod context; diff --git a/apollo-router/src/plugins/traffic_shaping/deduplication.rs b/apollo-router/src/plugins/traffic_shaping/deduplication.rs index b7f9ac6bf5..bae3f620bc 100644 --- a/apollo-router/src/plugins/traffic_shaping/deduplication.rs +++ b/apollo-router/src/plugins/traffic_shaping/deduplication.rs @@ -15,6 +15,7 @@ use tower::BoxError; use tower::Layer; use tower::ServiceExt; +use crate::batching::BatchQuery; use crate::graphql::Request; use crate::http_ext; use crate::plugins::authorization::CacheKeyMetadata; @@ -73,6 +74,18 @@ where wait_map: WaitMap, request: SubgraphRequest, ) -> Result { + // Check if the request is part of a batch. If it is, completely bypass dedup since it + // will break any request batches which this request is part of. + // This check is what enables Batching and Dedup to work together, so be very careful + // before making any changes to it. + if request + .context + .extensions() + .lock() + .contains_key::() + { + return service.ready_oneshot().await?.call(request).await; + } loop { let mut locked_wait_map = wait_map.lock().await; let authorization_cache_key = request.authorization.clone(); diff --git a/apollo-router/src/query_planner/plan.rs b/apollo-router/src/query_planner/plan.rs index 06e3519484..3ff16b0f5d 100644 --- a/apollo-router/src/query_planner/plan.rs +++ b/apollo-router/src/query_planner/plan.rs @@ -10,10 +10,12 @@ use serde::Serialize; pub(crate) use self::fetch::OperationKind; use super::fetch; use super::subscription::SubscriptionNode; +use crate::error::CacheResolverError; use crate::json_ext::Object; use crate::json_ext::Path; use crate::json_ext::Value; use crate::plugins::authorization::CacheKeyMetadata; +use crate::query_planner::fetch::QueryHash; use crate::spec::Query; /// A planner key. @@ -192,6 +194,65 @@ impl PlanNode { } } + /// Iteratively populate a Vec of QueryHashes representing Fetches in this plan. + /// + /// Do not include any operations which contain "requires" elements. + /// + /// This function is specifically designed to be used within the context of simple batching. It + /// explicitly fails if nodes which should *not* be encountered within that context are + /// encountered. e.g.: PlanNode::Defer + /// + /// It's unlikely/impossible that PlanNode::Defer or PlanNode::Subscription will ever be + /// supported, but it may be that PlanNode::Condition must eventually be supported (or other + /// new nodes types that are introduced). Explicitly fail each type to provide extra error + /// details and don't use _ so that future node types must be handled here. + pub(crate) fn query_hashes(&self) -> Result>, CacheResolverError> { + let mut query_hashes = vec![]; + let mut new_targets = vec![self]; + + loop { + let targets = new_targets; + if targets.is_empty() { + break; + } + + new_targets = vec![]; + for target in targets { + match target { + PlanNode::Sequence { nodes } | PlanNode::Parallel { nodes } => { + new_targets.extend(nodes); + } + PlanNode::Fetch(node) => { + // If requires.is_empty() we can batch it! + if node.requires.is_empty() { + query_hashes.push(node.schema_aware_hash.clone()); + } + } + PlanNode::Flatten(node) => new_targets.push(&node.node), + PlanNode::Defer { .. } => { + return Err(CacheResolverError::BatchingError( + "unexpected defer node encountered during query_hash processing" + .to_string(), + )) + } + PlanNode::Subscription { .. } => { + return Err(CacheResolverError::BatchingError( + "unexpected subscription node encountered during query_hash processing" + .to_string(), + )) + } + PlanNode::Condition { .. } => { + return Err(CacheResolverError::BatchingError( + "unexpected condition node encountered during query_hash processing" + .to_string(), + )) + } + } + } + } + Ok(query_hashes) + } + pub(crate) fn subgraph_fetches(&self) -> usize { match self { PlanNode::Sequence { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(), diff --git a/apollo-router/src/request.rs b/apollo-router/src/request.rs index eba779575b..1e51262dbf 100644 --- a/apollo-router/src/request.rs +++ b/apollo-router/src/request.rs @@ -177,8 +177,8 @@ impl Request { /// Convert Bytes into a GraphQL [`Request`]. /// - /// An error will be produced in the event that the query string parameters - /// cannot be turned into a valid GraphQL `Request`. + /// An error will be produced in the event that the bytes array cannot be + /// turned into a valid GraphQL `Request`. pub(crate) fn batch_from_bytes(bytes: &[u8]) -> Result, serde_json::Error> { let value: serde_json::Value = serde_json::from_slice(bytes).map_err(serde_json::Error::custom)?; diff --git a/apollo-router/src/response.rs b/apollo-router/src/response.rs index ad0da7f268..320b4c849d 100644 --- a/apollo-router/src/response.rs +++ b/apollo-router/src/response.rs @@ -102,12 +102,18 @@ impl Response { service: service_name.to_string(), reason: error.to_string(), })?; - let mut object = + let object = ensure_object!(value).map_err(|error| FetchError::SubrequestMalformedResponse { service: service_name.to_string(), reason: error.to_string(), })?; + Response::from_object(service_name, object) + } + pub(crate) fn from_object( + service_name: &str, + mut object: Object, + ) -> Result { let data = object.remove("data"); let errors = extract_key_value_from_object!(object, "errors", Value::Array(v) => v) .map_err(|err| FetchError::SubrequestMalformedResponse { diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 2591fc3aca..837b14056f 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -36,6 +36,8 @@ use tracing::Instrument; use super::ClientRequestAccepts; use crate::axum_factory::CanceledRequest; +use crate::batching::Batch; +use crate::batching::BatchQuery; use crate::cache::DeduplicatingCache; use crate::configuration::Batching; use crate::configuration::BatchingMode; @@ -91,7 +93,7 @@ pub(crate) struct RouterService { persisted_query_layer: Arc, query_analysis_layer: QueryAnalysisLayer, http_max_request_bytes: usize, - experimental_batching: Batching, + batching: Batching, } impl RouterService { @@ -101,7 +103,7 @@ impl RouterService { persisted_query_layer: Arc, query_analysis_layer: QueryAnalysisLayer, http_max_request_bytes: usize, - experimental_batching: Batching, + batching: Batching, ) -> Self { RouterService { supergraph_creator, @@ -109,7 +111,7 @@ impl RouterService { persisted_query_layer, query_analysis_layer, http_max_request_bytes, - experimental_batching, + batching, } } } @@ -398,7 +400,7 @@ impl RouterService { async fn call_inner(&self, req: RouterRequest) -> Result { let context = req.context.clone(); - let supergraph_requests = match self.translate_request(req).await { + let (supergraph_requests, is_batch) = match self.translate_request(req).await { Ok(requests) => requests, Err(err) => { u64_counter!( @@ -424,22 +426,47 @@ impl RouterService { } }; + // We need to handle cases where a failure is part of a batch and thus must be cancelled. + // Requests can be cancelled at any point of the router pipeline, but all failures bubble back + // up through here, so we can catch them without having to specially handle batch queries in + // other portions of the codebase. let futures = supergraph_requests .into_iter() - .map(|supergraph_request| self.process_supergraph_request(supergraph_request)); + .map(|supergraph_request| async { + // We clone the context here, because if the request results in an Err, the + // response context will no longer exist. + let context = supergraph_request.context.clone(); + let result = self.process_supergraph_request(supergraph_request).await; + + // Regardless of the result, we need to make sure that we cancel any potential batch queries. This is because + // custom rust plugins, rhai scripts, and coprocessors can cancel requests at any time and return a GraphQL + // error wrapped in an `Ok` or in a `BoxError` wrapped in an `Err`. + let batch_query_opt = context.extensions().lock().remove::(); + if let Some(batch_query) = batch_query_opt { + // Only proceed with signalling cancelled if the batch_query is not finished + if !batch_query.finished() { + tracing::debug!("cancelling batch query in supergraph response"); + batch_query + .signal_cancelled("request terminated by user".to_string()) + .await?; + } + } + + result + }); // Use join_all to preserve ordering of concurrent operations // (Short circuit processing and propagate any errors in the batch) + // Note: We use `join_all` here since it awaits all futures before returning, thus allowing us to + // handle cancellation logic without fear of the other futures getting killed. let mut results: Vec = join_all(futures) .await .into_iter() .collect::, BoxError>>()?; - // If we only have one result, go ahead and return it. Otherwise, create a new result - // which is an array of all results. - if results.len() == 1 { - Ok(results.pop().expect("we should have at least one response")) - } else { + // If we detected we are processing a batch, return an array of results even if there is only + // one result + if is_batch { let mut results_it = results.into_iter(); let first = results_it .next() @@ -459,13 +486,16 @@ impl RouterService { response: http::Response::from_parts(parts, Body::from(bytes.freeze())), context, }) + } else { + Ok(results.pop().expect("we should have at least one response")) } } async fn translate_query_request( &self, parts: &Parts, - ) -> Result, TranslateError> { + ) -> Result<(Vec, bool), TranslateError> { + let mut is_batch = false; parts.uri.query().map(|q| { let mut result = vec![]; @@ -476,8 +506,8 @@ impl RouterService { Err(err) => { // It may be a batch of requests, so try that (if config allows) before // erroring out - if self.experimental_batching.enabled - && matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) + if self.batching.enabled + && matches!(self.batching.mode, BatchingMode::BatchHttpLink) { result = graphql::Request::batch_from_urlencoded_query(q.to_string()) .map_err(|e| TranslateError { @@ -488,10 +518,11 @@ impl RouterService { "failed to decode a valid GraphQL request from path {e}" ), })?; + is_batch = true; } else if !q.is_empty() && q.as_bytes()[0] == b'[' { - let extension_details = if self.experimental_batching.enabled - && !matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) { - format!("batching not supported for mode `{}`", self.experimental_batching.mode) + let extension_details = if self.batching.enabled + && !matches!(self.batching.mode, BatchingMode::BatchHttpLink) { + format!("batching not supported for mode `{}`", self.batching.mode) } else { "batching not enabled".to_string() }; @@ -513,7 +544,7 @@ impl RouterService { } } }; - Ok(result) + Ok((result, is_batch)) }).unwrap_or_else(|| { Err(TranslateError { status: StatusCode::BAD_REQUEST, @@ -527,16 +558,17 @@ impl RouterService { fn translate_bytes_request( &self, bytes: &Bytes, - ) -> Result, TranslateError> { + ) -> Result<(Vec, bool), TranslateError> { let mut result = vec![]; + let mut is_batch = false; match graphql::Request::deserialize_from_bytes(bytes) { Ok(request) => { result.push(request); } Err(err) => { - if self.experimental_batching.enabled - && matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) + if self.batching.enabled + && matches!(self.batching.mode, BatchingMode::BatchHttpLink) { result = graphql::Request::batch_from_bytes(bytes).map_err(|e| TranslateError { @@ -547,14 +579,12 @@ impl RouterService { "failed to deserialize the request body into JSON: {e}" ), })?; + is_batch = true; } else if !bytes.is_empty() && bytes[0] == b'[' { - let extension_details = if self.experimental_batching.enabled - && !matches!(self.experimental_batching.mode, BatchingMode::BatchHttpLink) + let extension_details = if self.batching.enabled + && !matches!(self.batching.mode, BatchingMode::BatchHttpLink) { - format!( - "batching not supported for mode `{}`", - self.experimental_batching.mode - ) + format!("batching not supported for mode `{}`", self.batching.mode) } else { "batching not enabled".to_string() }; @@ -576,13 +606,13 @@ impl RouterService { } } }; - Ok(result) + Ok((result, is_batch)) } async fn translate_request( &self, req: RouterRequest, - ) -> Result, TranslateError> { + ) -> Result<(Vec, bool), TranslateError> { let RouterRequest { router_request, context, @@ -590,7 +620,8 @@ impl RouterService { let (parts, body) = router_request.into_parts(); - let graphql_requests: Result, TranslateError> = if parts.method + let graphql_requests: Result<(Vec, bool), TranslateError> = if parts + .method == Method::GET { self.translate_query_request(&parts).await @@ -640,15 +671,28 @@ impl RouterService { } }; - let ok_results = graphql_requests?; + let (ok_results, is_batch) = graphql_requests?; let mut results = Vec::with_capacity(ok_results.len()); + let batch_size = ok_results.len(); - if ok_results.len() > 1 { - context - .extensions() - .lock() - .insert(self.experimental_batching.clone()); - } + // Modifying our Context extensions. + // If we are processing a batch (is_batch == true), insert our batching configuration. + // If subgraph batching configuration exists and is enabled for any of our subgraphs, we create our shared batch details + let shared_batch_details = (is_batch) + .then(|| { + context.extensions().lock().insert(self.batching.clone()); + + self.batching.subgraph.as_ref() + }) + .flatten() + .map(|subgraph_batching_config| { + subgraph_batching_config.all.enabled + || subgraph_batching_config + .subgraphs + .values() + .any(|v| v.enabled) + }) + .and_then(|a| a.then_some(Arc::new(Batch::spawn_handler(batch_size)))); let mut ok_results_it = ok_results.into_iter(); let first = ok_results_it @@ -661,16 +705,17 @@ impl RouterService { // through the pipeline. This is because there is simply no way to clone http // extensions. // - // Secondly, we can't clone private_entries, but we need to propagate at least + // Secondly, we can't clone extensions, but we need to propagate at least // ClientRequestAccepts to ensure correct processing of the response. We do that manually, - // but the concern is that there may be other private_entries that wish to propagate into + // but the concern is that there may be other extensions that wish to propagate into // each request or we may add them in future and not know about it here... // - // (Technically we could clone private entries, since it is held under an `Arc`, but that - // would mean all the requests in a batch shared the same set of private entries and review + // (Technically we could clone extensions, since it is held under an `Arc`, but that + // would mean all the requests in a batch shared the same set of extensions and review // comments expressed the sentiment that this may be a bad thing...) // - for graphql_request in ok_results_it { + // Note: If we enter this loop, then we must be processing a batch. + for (index, graphql_request) in ok_results_it.enumerate() { // XXX Lose http extensions, is that ok? let mut new = http_ext::clone_http_request(&sg); *new.body_mut() = graphql_request; @@ -682,22 +727,45 @@ impl RouterService { .lock() .get::() .cloned(); - if let Some(client_request_accepts) = client_request_accepts_opt { - new_context - .extensions() - .lock() - .insert(client_request_accepts); + // Sub-scope so that new_context_guard is dropped before pushing into the new + // SupergraphRequest + { + let mut new_context_guard = new_context.extensions().lock(); + if let Some(client_request_accepts) = client_request_accepts_opt { + new_context_guard.insert(client_request_accepts); + } + new_context_guard.insert(self.batching.clone()); + // We are only going to insert a BatchQuery if Subgraph processing is enabled + if let Some(shared_batch_details) = &shared_batch_details { + new_context_guard.insert( + Batch::query_for_index(shared_batch_details.clone(), index + 1).map_err( + |err| TranslateError { + status: StatusCode::INTERNAL_SERVER_ERROR, + error: "failed to create batch", + extension_code: "BATCHING_ERROR", + extension_details: format!("failed to create batch entry: {err}"), + }, + )?, + ); + } } - new_context - .extensions() - .lock() - .insert(self.experimental_batching.clone()); results.push(SupergraphRequest { supergraph_request: new, - // Build a new context. Cloning would cause issues. context: new_context, }); } + + if let Some(shared_batch_details) = shared_batch_details { + context.extensions().lock().insert( + Batch::query_for_index(shared_batch_details, 0).map_err(|err| TranslateError { + status: StatusCode::INTERNAL_SERVER_ERROR, + error: "failed to create batch", + extension_code: "BATCHING_ERROR", + extension_details: format!("failed to create batch entry: {err}"), + })?, + ); + } + results.insert( 0, SupergraphRequest { @@ -705,7 +773,8 @@ impl RouterService { context, }, ); - Ok(results) + + Ok((results, is_batch)) } fn count_errors(errors: &[graphql::Error]) { @@ -756,7 +825,7 @@ pub(crate) struct RouterCreator { pub(crate) persisted_query_layer: Arc, query_analysis_layer: QueryAnalysisLayer, http_max_request_bytes: usize, - experimental_batching: Batching, + batching: Batching, } impl ServiceFactory for RouterCreator { @@ -807,7 +876,7 @@ impl RouterCreator { query_analysis_layer, http_max_request_bytes: configuration.limits.http_max_request_bytes, persisted_query_layer, - experimental_batching: configuration.experimental_batching.clone(), + batching: configuration.batching.clone(), }) } @@ -825,7 +894,7 @@ impl RouterCreator { self.persisted_query_layer.clone(), self.query_analysis_layer.clone(), self.http_max_request_bytes, - self.experimental_batching.clone(), + self.batching.clone(), )); ServiceBuilder::new() diff --git a/apollo-router/src/services/router/tests.rs b/apollo-router/src/services/router/tests.rs index 884b5d0c3a..3e58e50b44 100644 --- a/apollo-router/src/services/router/tests.rs +++ b/apollo-router/src/services/router/tests.rs @@ -305,7 +305,7 @@ async fn it_processes_a_valid_query_batch() { hyper::Body::from(result) }); let config = serde_json::json!({ - "experimental_batching": { + "batching": { "enabled": true, "mode" : "batch_http_link" } @@ -394,7 +394,7 @@ async fn it_will_not_process_a_poorly_formatted_query_batch() { hyper::Body::from(result) }); let config = serde_json::json!({ - "experimental_batching": { + "batching": { "enabled": true, "mode" : "batch_http_link" } @@ -448,7 +448,7 @@ async fn it_will_process_a_non_batched_defered_query() { hyper::Body::from(bytes) }); let config = serde_json::json!({ - "experimental_batching": { + "batching": { "enabled": true, "mode" : "batch_http_link" } @@ -508,7 +508,7 @@ async fn it_will_not_process_a_batched_deferred_query() { hyper::Body::from(result) }); let config = serde_json::json!({ - "experimental_batching": { + "batching": { "enabled": true, "mode" : "batch_http_link" } diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 048541c32e..e5666544f4 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -18,12 +18,14 @@ use http::HeaderValue; use http::Request; use hyper::Body; use hyper_rustls::ConfigBuilderExt; +use itertools::Itertools; use mediatype::names::APPLICATION; use mediatype::names::JSON; use mediatype::MediaType; use mime::APPLICATION_JSON; use rustls::RootCertStore; use serde::Serialize; +use tokio::sync::oneshot; use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async_tls_with_config; use tokio_tungstenite::tungstenite::client::IntoClientRequest; @@ -31,6 +33,7 @@ use tower::util::BoxService; use tower::BoxError; use tower::Service; use tower::ServiceExt; +use tracing::instrument; use tracing::Instrument; use uuid::Uuid; @@ -38,8 +41,14 @@ use super::http::HttpClientServiceFactory; use super::http::HttpRequest; use super::layers::content_negotiation::GRAPHQL_JSON_RESPONSE_HEADER_VALUE; use super::Plugins; +use crate::batching::assemble_batch; +use crate::batching::BatchQuery; +use crate::batching::BatchQueryInfo; +use crate::configuration::Batching; +use crate::configuration::BatchingMode; use crate::configuration::TlsClientAuth; use crate::error::FetchError; +use crate::error::SubgraphBatchingError; use crate::graphql; use crate::json_ext::Object; use crate::plugins::authentication::subgraph::SigningParamsConfig; @@ -122,7 +131,7 @@ impl SubgraphService { service: impl Into, configuration: &Configuration, subscription_config: Option, - client_factory: crate::services::http::HttpClientServiceFactory, + client_factory: HttpClientServiceFactory, ) -> Result { let name: String = service.into(); @@ -233,6 +242,7 @@ impl tower::Service for SubgraphService { let arc_apq_enabled = self.apq.clone(); let mut notify = self.notify.clone(); + let make_calls = async move { // Subscription handling if request.operation_kind == OperationKind::Subscription @@ -355,13 +365,18 @@ impl tower::Service for SubgraphService { } } - let client = client_factory.create(&service_name); - // If APQ is not enabled, simply make the graphql call // with the same request body. let apq_enabled = arc_apq_enabled.as_ref(); if !apq_enabled.load(Relaxed) { - return call_http(request, body, context, client, &service_name).await; + return call_http( + request, + body, + context, + client_factory.clone(), + &service_name, + ) + .await; } // Else, if APQ is enabled, @@ -395,7 +410,7 @@ impl tower::Service for SubgraphService { request.clone(), apq_body.clone(), context.clone(), - client_factory.create(&service_name), + client_factory.clone(), &service_name, ) .await?; @@ -408,11 +423,25 @@ impl tower::Service for SubgraphService { match get_apq_error(gql_response) { APQError::PersistedQueryNotSupported => { apq_enabled.store(false, Relaxed); - call_http(request, body, context, client, &service_name).await + call_http( + request, + body, + context, + client_factory.clone(), + &service_name, + ) + .await } APQError::PersistedQueryNotFound => { apq_body.query = query; - call_http(request, apq_body, context, client, &service_name).await + call_http( + request, + apq_body, + context, + client_factory.clone(), + &service_name, + ) + .await } _ => Ok(response), } @@ -607,39 +636,10 @@ async fn call_websocket( )) } -/// call_http makes http calls with modified graphql::Request (body) -async fn call_http( - request: SubgraphRequest, - body: graphql::Request, - context: Context, - client: crate::services::http::BoxService, - service_name: &str, -) -> Result { - let SubgraphRequest { - subgraph_request, .. - } = request; - - let operation_name = subgraph_request - .body() - .operation_name - .clone() - .unwrap_or_default(); - - let (parts, _) = subgraph_request.into_parts(); - let body = serde_json::to_string(&body).expect("JSON serialization should not fail"); - let mut request = http::Request::from_parts(parts, Body::from(body)); - - request - .headers_mut() - .insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone()); - request - .headers_mut() - .append(ACCEPT, ACCEPT_GRAPHQL_JSON.clone()); - - let schema_uri = request.uri(); - let host = schema_uri.host().unwrap_or_default(); - let port = schema_uri.port_u16().unwrap_or_else(|| { - let scheme = schema_uri.scheme_str(); +// Utility function to extract uri details. +fn get_uri_details(uri: &hyper::Uri) -> (&str, u16, &str) { + let port = uri.port_u16().unwrap_or_else(|| { + let scheme = uri.scheme_str(); if scheme == Some("https") { 443 } else if scheme == Some("http") { @@ -649,52 +649,16 @@ async fn call_http( } }); - let path = schema_uri.path(); - - let subgraph_req_span = tracing::info_span!("subgraph_request", - "otel.kind" = "CLIENT", - "net.peer.name" = %host, - "net.peer.port" = %port, - "http.route" = %path, - "http.url" = %schema_uri, - "net.transport" = "ip_tcp", - "apollo.subgraph.name" = %service_name, - "graphql.operation.name" = %operation_name, - ); - - // The graphql spec is lax about what strategy to use for processing responses: https://github.com/graphql/graphql-over-http/blob/main/spec/GraphQLOverHTTP.md#processing-the-response - // - // "If the response uses a non-200 status code and the media type of the response payload is application/json - // then the client MUST NOT rely on the body to be a well-formed GraphQL response since the source of the response - // may not be the server but instead some intermediary such as API gateways, proxies, firewalls, etc." - // - // The TLDR of this is that it's really asking us to do the best we can with whatever information we have with some modifications depending on content type. - // Our goal is to give the user the most relevant information possible in the response errors - // - // Rules: - // 1. If the content type of the response is not `application/json` or `application/graphql-response+json` then we won't try to parse. - // 2. If an HTTP status is not 2xx it will always be attached as a graphql error. - // 3. If the response type is `application/json` and status is not 2xx and the body the entire body will be output if the response is not valid graphql. - - let display_body = context.contains_key(LOGGING_DISPLAY_BODY); - - // TODO: Temporary solution to plug FileUploads plugin until 'http_client' will be fixed https://github.com/apollographql/router/pull/4666 - let request = file_uploads::http_request_wrapper(request).await; - - // Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it. - let (parts, content_type, body) = - do_fetch(client, &context, service_name, request, display_body) - .instrument(subgraph_req_span) - .await?; - - if display_body { - if let Some(Ok(b)) = &body { - tracing::info!( - response.body = %String::from_utf8_lossy(b), apollo.subgraph.name = %service_name, "Raw response body from subgraph {service_name:?} received" - ); - } - } + (uri.host().unwrap_or_default(), port, uri.path()) +} +// Utility function to create a graphql response from HTTP response components +fn http_response_to_graphql_response( + service_name: &str, + content_type: Result, + body: Option>, + parts: &Parts, +) -> graphql::Response { let mut graphql_response = match (content_type, body, parts.status.is_success()) { (Ok(ContentType::ApplicationGraphqlResponseJson), Some(Ok(body)), _) | (Ok(ContentType::ApplicationJson), Some(Ok(body)), true) => { @@ -761,11 +725,409 @@ async fn call_http( .to_graphql_error(None), ) } + graphql_response +} + +/// Process a single subgraph batch request +#[instrument(skip(client_factory, context, request))] +pub(crate) async fn process_batch( + client_factory: HttpClientServiceFactory, + service: String, + context: Context, + mut request: http::Request, + listener_count: usize, +) -> Result, FetchError> { + // Now we need to "batch up" our data and send it to our subgraphs + request + .headers_mut() + .insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone()); + request + .headers_mut() + .append(ACCEPT, ACCEPT_GRAPHQL_JSON.clone()); + + let schema_uri = request.uri(); + let (host, port, path) = get_uri_details(schema_uri); + + // We can't provide a single operation name in the span (since we may be processing multiple + // operations). Product decision, use the hard coded value "batch". + let subgraph_req_span = tracing::info_span!("subgraph_request", + "otel.kind" = "CLIENT", + "net.peer.name" = %host, + "net.peer.port" = %port, + "http.route" = %path, + "http.url" = %schema_uri, + "net.transport" = "ip_tcp", + "apollo.subgraph.name" = %&service, + "graphql.operation.name" = "batch" + ); + + // The graphql spec is lax about what strategy to use for processing responses: https://github.com/graphql/graphql-over-http/blob/main/spec/GraphQLOverHTTP.md#processing-the-response + // + // "If the response uses a non-200 status code and the media type of the response payload is application/json + // then the client MUST NOT rely on the body to be a well-formed GraphQL response since the source of the response + // may not be the server but instead some intermediary such as API gateways, proxies, firewalls, etc." + // + // The TLDR of this is that it's really asking us to do the best we can with whatever information we have with some modifications depending on content type. + // Our goal is to give the user the most relevant information possible in the response errors + // + // Rules: + // 1. If the content type of the response is not `application/json` or `application/graphql-response+json` then we won't try to parse. + // 2. If an HTTP status is not 2xx it will always be attached as a graphql error. + // 3. If the response type is `application/json` and status is not 2xx and the body the entire body will be output if the response is not valid graphql. + + let display_body = context.contains_key(LOGGING_DISPLAY_BODY); + let client = client_factory.create(&service); + + // Update our batching metrics (just before we fetch) + tracing::info!(histogram.apollo.router.operations.batching.size = listener_count as f64, + mode = %BatchingMode::BatchHttpLink, // Only supported mode right now + subgraph = &service + ); + + tracing::info!(monotonic_counter.apollo.router.operations.batching = 1u64, + mode = %BatchingMode::BatchHttpLink, // Only supported mode right now + subgraph = &service + ); + + // Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it. + tracing::debug!("fetching from subgraph: {service}"); + let (parts, content_type, body) = do_fetch(client, &context, &service, request, display_body) + .instrument(subgraph_req_span) + .await?; + + if display_body { + if let Some(Ok(b)) = &body { + tracing::info!( + response.body = %String::from_utf8_lossy(b), apollo.subgraph.name = %&service, "Raw response body from subgraph {service:?} received" + ); + } + } + + tracing::debug!("parts: {parts:?}, content_type: {content_type:?}, body: {body:?}"); + let value = + serde_json::from_slice(&body.ok_or(FetchError::SubrequestMalformedResponse { + service: service.to_string(), + reason: "no body in response".to_string(), + })??) + .map_err(|error| FetchError::SubrequestMalformedResponse { + service: service.to_string(), + reason: error.to_string(), + })?; + + tracing::debug!("json value from body is: {value:?}"); + + let array = ensure_array!(value).map_err(|error| FetchError::SubrequestMalformedResponse { + service: service.to_string(), + reason: error.to_string(), + })?; + let mut graphql_responses = Vec::with_capacity(array.len()); + for value in array { + let object = + ensure_object!(value).map_err(|error| FetchError::SubrequestMalformedResponse { + service: service.to_string(), + reason: error.to_string(), + })?; + + // Map our Vec into Bytes + // Map our serde conversion error to a FetchError + let body = Some( + serde_json::to_vec(&object) + .map(|v| v.into()) + .map_err(|error| FetchError::SubrequestMalformedResponse { + service: service.to_string(), + reason: error.to_string(), + }), + ); + + let graphql_response = + http_response_to_graphql_response(&service, content_type.clone(), body, &parts); + graphql_responses.push(graphql_response); + } + + tracing::debug!("we have a vec of graphql_responses: {graphql_responses:?}"); + // Build an http Response for each graphql response + let subgraph_responses: Result, _> = graphql_responses + .into_iter() + .map(|res| { + http::Response::builder() + .status(parts.status) + .version(parts.version) + .body(res) + .map(|mut http_res| { + *http_res.headers_mut() = parts.headers.clone(); + let resp = SubgraphResponse::new_from_response(http_res, context.clone()); + + tracing::debug!("we have a resp: {resp:?}"); + resp + }) + .map_err(|e| FetchError::MalformedResponse { + reason: e.to_string(), + }) + }) + .collect(); + + tracing::debug!("we have a vec of subgraph_responses: {subgraph_responses:?}"); + subgraph_responses +} + +/// Notify all listeners of a batch query of the results +pub(crate) async fn notify_batch_query( + service: String, + senders: Vec>>, + responses: Result, FetchError>, +) -> Result<(), BoxError> { + tracing::debug!( + "handling response for service '{service}' with {} listeners: {responses:#?}", + senders.len() + ); + + match responses { + // If we had an error processing the batch, then pipe that error to all of the listeners + Err(e) => { + for tx in senders { + // Try to notify all waiters. If we can't notify an individual sender, then log an error + if let Err(log_error) = tx.send(Err(Box::new(e.clone()))).map_err(|error| { + FetchError::SubrequestBatchingError { + service: service.clone(), + reason: format!("tx send failed: {error:?}"), + } + }) { + tracing::error!(service, error=%log_error, "failed to notify sender that batch processing failed"); + } + } + } + + Ok(rs) => { + // Before we process our graphql responses, ensure that we have a tx for each + // response + if senders.len() != rs.len() { + return Err(Box::new(FetchError::SubrequestBatchingError { + service, + reason: format!( + "number of txs ({}) is not equal to number of graphql responses ({})", + senders.len(), + rs.len() + ), + })); + } + + // We have checked before we started looping that we had a tx for every + // graphql_response, so zip_eq shouldn't panic. + // Use the tx to send a graphql_response message to each waiter. + for (response, sender) in rs.into_iter().zip_eq(senders) { + if let Err(log_error) = + sender + .send(Ok(response)) + .map_err(|error| FetchError::SubrequestBatchingError { + service: service.to_string(), + reason: format!("tx send failed: {error:?}"), + }) + { + tracing::error!(service, error=%log_error, "failed to notify sender that batch processing succeeded"); + } + } + } + } + + Ok(()) +} + +type BatchInfo = ( + (String, http::Request, Context, usize), + Vec>>, +); + +/// Collect all batch requests and process them concurrently +#[instrument(skip_all)] +pub(crate) async fn process_batches( + client_factory: HttpClientServiceFactory, + svc_map: HashMap>, +) -> Result<(), BoxError> { + // We need to strip out the senders so that we can work with them separately. + let mut errors = vec![]; + let (info, txs): (Vec<_>, Vec<_>) = + futures::future::join_all(svc_map.into_iter().map(|(service, requests)| async { + let (_op_name, context, request, txs) = assemble_batch(requests).await?; + + Ok(((service, request, context, txs.len()), txs)) + })) + .await + .into_iter() + .filter_map(|x: Result| x.map_err(|e| errors.push(e)).ok()) + .unzip(); + + // If errors isn't empty, then process_batches cannot proceed. Let's log out the errors and + // return + if !errors.is_empty() { + for error in errors { + tracing::error!("assembling batch failed: {error}"); + } + return Err(SubgraphBatchingError::ProcessingFailed( + "assembling batches failed".to_string(), + ) + .into()); + } + // Collect all of the processing logic and run them concurrently, collecting all errors + let cf = &client_factory; + // It is not ok to panic if the length of the txs and info do not match. Let's make sure they + // do + if txs.len() != info.len() { + return Err(SubgraphBatchingError::ProcessingFailed( + "length of txs and info are not equal".to_string(), + ) + .into()); + } + let batch_futures = info.into_iter().zip_eq(txs).map( + |((service, request, context, listener_count), senders)| async move { + let batch_result = process_batch( + cf.clone(), + service.clone(), + context, + request, + listener_count, + ) + .await; + + notify_batch_query(service, senders, batch_result).await + }, + ); + + futures::future::try_join_all(batch_futures).await?; + + Ok(()) +} + +async fn call_http( + request: SubgraphRequest, + body: graphql::Request, + context: Context, + client_factory: HttpClientServiceFactory, + service_name: &str, +) -> Result { + // We use configuration to determine if calls may be batched. If we have Batching + // configuration, then we check (batch_include()) if the current subgraph has batching enabled + // in configuration. If it does, we then start to process a potential batch. + // + // If we are processing a batch, then we'd like to park tasks here, but we can't park them whilst + // we have the context extensions lock held. That would be very bad... + // We grab the (potential) BatchQuery and then operate on it later + let opt_batch_query = { + let extensions_guard = context.extensions().lock(); + + // We need to make sure to remove the BatchQuery from the context as it holds a sender to + // the owning batch + extensions_guard + .get::() + .and_then(|batching_config| batching_config.batch_include(service_name).then_some(())) + .and_then(|_| extensions_guard.get::().cloned()) + .and_then(|bq| (!bq.finished()).then_some(bq)) + }; + + // If we have a batch query, then it's time for batching + if let Some(query) = opt_batch_query { + // Let the owning batch know that this query is ready to process, getting back the channel + // from which we'll eventually receive our response. + let response_rx = query.signal_progress(client_factory, request, body).await?; + + // Park this query until we have our response and pass it back up + response_rx + .await + .map_err(|err| FetchError::SubrequestBatchingError { + service: service_name.to_string(), + reason: format!("tx receive failed: {err}"), + })? + } else { + tracing::debug!("we called http"); + let client = client_factory.create(service_name); + call_single_http(request, body, context, client, service_name).await + } +} + +/// call_single_http makes http calls with modified graphql::Request (body) +pub(crate) async fn call_single_http( + request: SubgraphRequest, + body: graphql::Request, + context: Context, + client: crate::services::http::BoxService, + service_name: &str, +) -> Result { + let SubgraphRequest { + subgraph_request, .. + } = request; + + let operation_name = subgraph_request + .body() + .operation_name + .clone() + .unwrap_or_default(); + + let (parts, _) = subgraph_request.into_parts(); + let body = serde_json::to_string(&body)?; + tracing::debug!("our JSON body: {body:?}"); + let mut request = http::Request::from_parts(parts, Body::from(body)); + + request + .headers_mut() + .insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone()); + request + .headers_mut() + .append(ACCEPT, ACCEPT_GRAPHQL_JSON.clone()); + + let schema_uri = request.uri(); + let (host, port, path) = get_uri_details(schema_uri); + + let subgraph_req_span = tracing::info_span!("subgraph_request", + "otel.kind" = "CLIENT", + "net.peer.name" = %host, + "net.peer.port" = %port, + "http.route" = %path, + "http.url" = %schema_uri, + "net.transport" = "ip_tcp", + "apollo.subgraph.name" = %service_name, + "graphql.operation.name" = %operation_name, + ); + + // The graphql spec is lax about what strategy to use for processing responses: https://github.com/graphql/graphql-over-http/blob/main/spec/GraphQLOverHTTP.md#processing-the-response + // + // "If the response uses a non-200 status code and the media type of the response payload is application/json + // then the client MUST NOT rely on the body to be a well-formed GraphQL response since the source of the response + // may not be the server but instead some intermediary such as API gateways, proxies, firewalls, etc." + // + // The TLDR of this is that it's really asking us to do the best we can with whatever information we have with some modifications depending on content type. + // Our goal is to give the user the most relevant information possible in the response errors + // + // Rules: + // 1. If the content type of the response is not `application/json` or `application/graphql-response+json` then we won't try to parse. + // 2. If an HTTP status is not 2xx it will always be attached as a graphql error. + // 3. If the response type is `application/json` and status is not 2xx and the body the entire body will be output if the response is not valid graphql. + + let display_body = context.contains_key(LOGGING_DISPLAY_BODY); + + // TODO: Temporary solution to plug FileUploads plugin until 'http_client' will be fixed https://github.com/apollographql/router/pull/4666 + let request = file_uploads::http_request_wrapper(request).await; + + // Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it. + let (parts, content_type, body) = + do_fetch(client, &context, service_name, request, display_body) + .instrument(subgraph_req_span) + .await?; + + if display_body { + if let Some(Ok(b)) = &body { + tracing::info!( + response.body = %String::from_utf8_lossy(b), apollo.subgraph.name = %service_name, "Raw response body from subgraph {service_name:?} received" + ); + } + } + + let graphql_response = + http_response_to_graphql_response(service_name, content_type, body, &parts); let resp = http::Response::from_parts(parts, graphql_response); Ok(SubgraphResponse::new_from_response(resp, context)) } +#[derive(Clone, Debug)] enum ContentType { ApplicationJson, ApplicationGraphqlResponseJson, @@ -2373,4 +2735,162 @@ mod tests { assert_eq!(resp.response.body(), &expected_resp); } + + #[test] + fn it_gets_uri_details() { + let path = "https://example.com/path".parse().unwrap(); + let (host, port, path) = super::get_uri_details(&path); + + assert_eq!(host, "example.com"); + assert_eq!(port, 443); + assert_eq!(path, "/path"); + } + + #[test] + fn it_converts_ok_http_to_graphql() { + let (parts, body) = http::Response::builder() + .status(StatusCode::OK) + .body(None) + .unwrap() + .into_parts(); + let actual = super::http_response_to_graphql_response( + "test_service", + Ok(ContentType::ApplicationGraphqlResponseJson), + body, + &parts, + ); + + let expected = graphql::Response::builder().build(); + assert_eq!(actual, expected); + } + + #[test] + fn it_converts_error_http_to_graphql() { + let (parts, body) = http::Response::builder() + .status(StatusCode::IM_A_TEAPOT) + .body(None) + .unwrap() + .into_parts(); + let actual = super::http_response_to_graphql_response( + "test_service", + Ok(ContentType::ApplicationGraphqlResponseJson), + body, + &parts, + ); + + let expected = graphql::Response::builder() + .error( + super::FetchError::SubrequestHttpError { + status_code: Some(418), + service: "test_service".into(), + reason: "418: I'm a teapot".into(), + } + .to_graphql_error(None), + ) + .build(); + assert_eq!(actual, expected); + } + + #[test] + fn it_converts_http_with_body_to_graphql() { + let mut json = serde_json::json!({ + "data": { + "some_field": "some_value" + } + }); + + let (parts, body) = http::Response::builder() + .status(StatusCode::OK) + .body(Some(Ok(Bytes::from(json.to_string())))) + .unwrap() + .into_parts(); + + let actual = super::http_response_to_graphql_response( + "test_service", + Ok(ContentType::ApplicationGraphqlResponseJson), + body, + &parts, + ); + + let expected = graphql::Response::builder() + .data(json["data"].take()) + .build(); + assert_eq!(actual, expected); + } + + #[test] + fn it_converts_http_with_graphql_errors_to_graphql() { + let error = graphql::Error::builder() + .message("error was encountered for test") + .extension_code("SOME_EXTENSION") + .build(); + let mut json = serde_json::json!({ + "data": { + "some_field": "some_value", + "error_field": null, + }, + "errors": [error], + }); + + let (parts, body) = http::Response::builder() + .status(StatusCode::OK) + .body(Some(Ok(Bytes::from(json.to_string())))) + .unwrap() + .into_parts(); + + let actual = super::http_response_to_graphql_response( + "test_service", + Ok(ContentType::ApplicationGraphqlResponseJson), + body, + &parts, + ); + + let expected = graphql::Response::builder() + .data(json["data"].take()) + .error(error) + .build(); + assert_eq!(actual, expected); + } + + #[test] + fn it_converts_error_http_with_graphql_errors_to_graphql() { + let error = graphql::Error::builder() + .message("error was encountered for test") + .extension_code("SOME_EXTENSION") + .build(); + let mut json = serde_json::json!({ + "data": { + "some_field": "some_value", + "error_field": null, + }, + "errors": [error], + }); + + let (parts, body) = http::Response::builder() + .status(StatusCode::IM_A_TEAPOT) + .body(Some(Ok(Bytes::from(json.to_string())))) + .unwrap() + .into_parts(); + + let actual = super::http_response_to_graphql_response( + "test_service", + Ok(ContentType::ApplicationGraphqlResponseJson), + body, + &parts, + ); + + let expected = graphql::Response::builder() + .data(json["data"].take()) + .error( + super::FetchError::SubrequestHttpError { + status_code: Some(418), + service: "test_service".into(), + reason: "418: I'm a teapot".into(), + } + .to_graphql_error(None), + ) + .error(error) + .build(); + assert_eq!(actual, expected); + } } diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index 3019a1f986..4ba4c85455 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -24,6 +24,7 @@ use tracing::field; use tracing::Span; use tracing_futures::Instrument; +use crate::batching::BatchQuery; use crate::configuration::Batching; use crate::context::OPERATION_NAME; use crate::error::CacheResolverError; @@ -617,19 +618,33 @@ async fn plan_query( .insert::(doc); } - planning + let qpr = planning .call( query_planner::CachingRequest::builder() .query(query_str) .and_operation_name(operation_name) - .context(context) + .context(context.clone()) .build(), ) .instrument(tracing::info_span!( QUERY_PLANNING_SPAN_NAME, "otel.kind" = "INTERNAL" )) - .await + .await?; + + let batching = context.extensions().lock().get::().cloned(); + if let Some(batch_query) = batching { + if let Some(QueryPlannerContent::Plan { plan, .. }) = &qpr.content { + let query_hashes = plan.root.query_hashes()?; + batch_query + .set_query_hashes(query_hashes) + .await + .map_err(|e| CacheResolverError::BatchingError(e.to_string()))?; + tracing::debug!("batch registered: {}", batch_query); + } + } + + Ok(qpr) } fn clone_supergraph_request( diff --git a/apollo-router/src/uplink/license_enforcement.rs b/apollo-router/src/uplink/license_enforcement.rs index cc33346818..50b6c410c6 100644 --- a/apollo-router/src/uplink/license_enforcement.rs +++ b/apollo-router/src/uplink/license_enforcement.rs @@ -382,6 +382,10 @@ impl LicenseEnforcementReport { .path("$.preview_file_uploads") .name("File uploads plugin") .build(), + ConfigurationRestriction::builder() + .path("$.batching") + .name("Batching support") + .build(), ConfigurationRestriction::builder() .path("$.experimental_demand_control") .name("Demand control plugin") diff --git a/apollo-router/tests/fixtures/apollo_reports_batch.router.yaml b/apollo-router/tests/fixtures/apollo_reports_batch.router.yaml index fdbb1ed4dd..238b8a00dd 100644 --- a/apollo-router/tests/fixtures/apollo_reports_batch.router.yaml +++ b/apollo-router/tests/fixtures/apollo_reports_batch.router.yaml @@ -1,4 +1,4 @@ -experimental_batching: +batching: enabled: true mode: batch_http_link rhai: @@ -28,5 +28,3 @@ telemetry: send_variable_values: only: - "sendValue" - - diff --git a/apollo-router/tests/fixtures/batching/all_enabled.router.yaml b/apollo-router/tests/fixtures/batching/all_enabled.router.yaml new file mode 100644 index 0000000000..24c9818562 --- /dev/null +++ b/apollo-router/tests/fixtures/batching/all_enabled.router.yaml @@ -0,0 +1,11 @@ +# Simple config to enable batching for all subgraphs + +batching: + enabled: true + mode: batch_http_link + subgraph: + all: + enabled: true + +include_subgraph_errors: + all: true diff --git a/apollo-router/tests/fixtures/batching/block_request.rhai b/apollo-router/tests/fixtures/batching/block_request.rhai new file mode 100644 index 0000000000..c0ec2e6ae0 --- /dev/null +++ b/apollo-router/tests/fixtures/batching/block_request.rhai @@ -0,0 +1,10 @@ +// Simple rhai script to block a request for batching testing +fn execution_service(service) { + let request_callback = |request| { + if request.body.query.contains("failMe") { + throw "cancelled expected failure" + } + }; + + service.map_request(request_callback); +} diff --git a/apollo-router/tests/fixtures/batching/coprocessor.router.yaml b/apollo-router/tests/fixtures/batching/coprocessor.router.yaml new file mode 100644 index 0000000000..7292662239 --- /dev/null +++ b/apollo-router/tests/fixtures/batching/coprocessor.router.yaml @@ -0,0 +1,19 @@ +# Simple config to enable batching and a coprocessor for testing killed requests + +batching: + enabled: true + mode: batch_http_link + subgraph: + all: + enabled: true + +coprocessor: + url: http://127.0.0.1:REPLACEME # Will be overwritten by the test + subgraph: + all: + request: + service_name: true + body: true + +include_subgraph_errors: + all: true diff --git a/apollo-router/tests/fixtures/batching/rhai_script.router.yaml b/apollo-router/tests/fixtures/batching/rhai_script.router.yaml new file mode 100644 index 0000000000..b4b488be39 --- /dev/null +++ b/apollo-router/tests/fixtures/batching/rhai_script.router.yaml @@ -0,0 +1,15 @@ +# Simple config to enable batching and rhai scripts for testing + +batching: + enabled: true + mode: batch_http_link + subgraph: + all: + enabled: true + +rhai: + scripts: ./tests/fixtures/batching + main: block_request.rhai + +include_subgraph_errors: + all: true diff --git a/apollo-router/tests/fixtures/batching/schema.graphql b/apollo-router/tests/fixtures/batching/schema.graphql new file mode 100644 index 0000000000..0968c300bb --- /dev/null +++ b/apollo-router/tests/fixtures/batching/schema.graphql @@ -0,0 +1,56 @@ +schema + @link(url: "https://specs.apollo.dev/link/v1.0") + @link(url: "https://specs.apollo.dev/join/v0.3", for: EXECUTION) +{ + query: Query +} + +directive @join__enumValue(graph: join__Graph!) repeatable on ENUM_VALUE + +directive @join__field(graph: join__Graph, requires: join__FieldSet, provides: join__FieldSet, type: String, external: Boolean, override: String, usedOverridden: Boolean) repeatable on FIELD_DEFINITION | INPUT_FIELD_DEFINITION + +directive @join__graph(name: String!, url: String!) on ENUM_VALUE + +directive @join__implements(graph: join__Graph!, interface: String!) repeatable on OBJECT | INTERFACE + +directive @join__type(graph: join__Graph!, key: join__FieldSet, extension: Boolean! = false, resolvable: Boolean! = true, isInterfaceObject: Boolean! = false) repeatable on OBJECT | INTERFACE | UNION | ENUM | INPUT_OBJECT | SCALAR + +directive @join__unionMember(graph: join__Graph!, member: String!) repeatable on UNION + +directive @link(url: String, as: String, for: link__Purpose, import: [link__Import]) repeatable on SCHEMA + +scalar join__FieldSet + +enum join__Graph { + A @join__graph(name: "a", url: "http://127.0.0.1:4005/a") + B @join__graph(name: "b", url: "http://127.0.0.1:4005/b") +} + +scalar link__Import + +enum link__Purpose { + """ + `SECURITY` features provide metadata necessary to securely resolve fields. + """ + SECURITY + + """ + `EXECUTION` features provide metadata necessary for operation execution. + """ + EXECUTION +} + +type Structure + @join__type(graph: A, key: "index") + @join__type(graph: B, key: "index") +{ + index: Int, +} + +type Query + @join__type(graph: A) + @join__type(graph: B) +{ + entryA(count: Int): Structure @join__field(graph: A) + entryB(count: Int): Structure @join__field(graph: B) +} diff --git a/apollo-router/tests/fixtures/batching/short_timeouts.router.yaml b/apollo-router/tests/fixtures/batching/short_timeouts.router.yaml new file mode 100644 index 0000000000..747688fa2d --- /dev/null +++ b/apollo-router/tests/fixtures/batching/short_timeouts.router.yaml @@ -0,0 +1,14 @@ +# Batching config with short timeouts for testing + +batching: + enabled: true + mode: batch_http_link + subgraph: + all: + enabled: true +traffic_shaping: + all: + timeout: 1s + +include_subgraph_errors: + all: true diff --git a/apollo-router/tests/integration/batching.rs b/apollo-router/tests/integration/batching.rs new file mode 100644 index 0000000000..a9e8e3234d --- /dev/null +++ b/apollo-router/tests/integration/batching.rs @@ -0,0 +1,1026 @@ +use apollo_router::graphql::Request; +use insta::assert_yaml_snapshot; +use itertools::Itertools; +use tower::BoxError; +use wiremock::ResponseTemplate; + +use crate::integration::common::ValueExt as _; + +const CONFIG: &str = include_str!("../fixtures/batching/all_enabled.router.yaml"); +const SHORT_TIMEOUTS_CONFIG: &str = include_str!("../fixtures/batching/short_timeouts.router.yaml"); + +fn test_is_enabled() -> bool { + std::env::var("TEST_APOLLO_KEY").is_ok() && std::env::var("TEST_APOLLO_GRAPH_REF").is_ok() +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_supports_single_subgraph_batching() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 5; + + let requests: Vec<_> = (0..REQUEST_COUNT) + .map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }) + .collect(); + let responses = helper::run_test( + CONFIG, + &requests[..], + Some(helper::expect_batch), + None::, + ) + .await?; + + if test_is_enabled() { + // Make sure that we got back what we wanted + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryA: + index: 1 + - data: + entryA: + index: 2 + - data: + entryA: + index: 3 + - data: + entryA: + index: 4 + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_supports_multi_subgraph_batching() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 3; + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + + // Interleave requests so that we can verify that they get properly separated + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + CONFIG, + &requests, + Some(helper::expect_batch), + Some(helper::expect_batch), + ) + .await?; + + if test_is_enabled() { + // Make sure that we got back what we wanted + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryB: + index: 0 + - data: + entryA: + index: 1 + - data: + entryB: + index: 1 + - data: + entryA: + index: 2 + - data: + entryB: + index: 2 + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_batches_with_errors_in_single_graph() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 4; + + let requests: Vec<_> = (0..REQUEST_COUNT) + .map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }) + .collect(); + let responses = helper::run_test( + CONFIG, + &requests[..], + Some(helper::fail_second_batch_request), + None::, + ) + .await?; + + if test_is_enabled() { + // Make sure that we got back what we wanted + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - errors: + - message: expected error in A + - data: + entryA: + index: 2 + - data: + entryA: + index: 3 + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_batches_with_errors_in_multi_graph() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 3; + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + + // Interleave requests so that we can verify that they get properly separated + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + CONFIG, + &requests, + Some(helper::fail_second_batch_request), + Some(helper::fail_second_batch_request), + ) + .await?; + + if test_is_enabled() { + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryB: + index: 0 + - errors: + - message: expected error in A + - errors: + - message: expected error in B + - data: + entryA: + index: 2 + - data: + entryB: + index: 2 + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_short_timeouts() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 2; + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + + // Interleave requests so that we can verify that they get properly separated + // Have the B subgraph timeout + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + SHORT_TIMEOUTS_CONFIG, + &requests, + Some(helper::expect_batch), + Some(helper::never_respond), + ) + .await?; + + if test_is_enabled() { + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - errors: + - message: "HTTP fetch failed from 'b': request timed out" + path: [] + extensions: + code: SUBREQUEST_HTTP_ERROR + service: b + reason: request timed out + - data: + entryA: + index: 1 + - errors: + - message: "HTTP fetch failed from 'b': request timed out" + path: [] + extensions: + code: SUBREQUEST_HTTP_ERROR + service: b + reason: request timed out + "###); + } + + Ok(()) +} + +// This test makes two simultaneous requests to the router, with the first +// being never resolved. This is to make sure that the router doesn't hang while +// processing a separate batch request. +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_indefinite_timeouts() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 3; + + let requests_a: Vec<_> = (0..REQUEST_COUNT) + .map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }) + .collect(); + let requests_b: Vec<_> = (0..REQUEST_COUNT) + .map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }) + .collect(); + + let responses_a = helper::run_test( + SHORT_TIMEOUTS_CONFIG, + &requests_a, + Some(helper::expect_batch), + None::, + ); + let responses_b = helper::run_test( + SHORT_TIMEOUTS_CONFIG, + &requests_b, + None::, + Some(helper::never_respond), + ); + + // Run both requests simultaneously + let (results_a, results_b) = futures::try_join!(responses_a, responses_b)?; + + // verify the output + let responses = [results_a, results_b].concat(); + if test_is_enabled() { + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryA: + index: 1 + - data: + entryA: + index: 2 + - errors: + - message: "HTTP fetch failed from 'b': request timed out" + path: [] + extensions: + code: SUBREQUEST_HTTP_ERROR + service: b + reason: request timed out + - errors: + - message: "HTTP fetch failed from 'b': request timed out" + path: [] + extensions: + code: SUBREQUEST_HTTP_ERROR + service: b + reason: request timed out + - errors: + - message: "HTTP fetch failed from 'b': request timed out" + path: [] + extensions: + code: SUBREQUEST_HTTP_ERROR + service: b + reason: request timed out + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_cancelled_by_rhai() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 2; + const RHAI_CONFIG: &str = include_str!("../fixtures/batching/rhai_script.router.yaml"); + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}_failMe{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + + // Interleave requests so that we can verify that they get properly separated + // Have the B subgraph get all of its requests cancelled by a rhai script + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + RHAI_CONFIG, + &requests, + Some(helper::expect_batch), + None::, + ) + .await?; + + if test_is_enabled() { + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - errors: + - message: "rhai execution error: 'Runtime error: cancelled expected failure (line 5, position 13)\nin closure call'" + - data: + entryA: + index: 1 + - errors: + - message: "rhai execution error: 'Runtime error: cancelled expected failure (line 5, position 13)\nin closure call'" + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_single_request_cancelled_by_rhai() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 2; + const RHAI_CONFIG: &str = include_str!("../fixtures/batching/rhai_script.router.yaml"); + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query {}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}", + (index == 1) + .then_some("failMe".to_string()) + .unwrap_or(format!("op{index}")) + )) + .build() + }); + + // Custom validation for subgraph B + fn handle_b(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // We should have gotten all of the regular elements minus the second + assert_eq!(requests.len(), REQUEST_COUNT - 1); + + // Each element should have be for the specified subgraph and should have a field selection + // of index. The index should be 0..n without 1. + // Note: The router appends info to the query, so we append it at this check + for (request, index) in requests.into_iter().zip((0..).filter(|&i| i != 1)) { + assert_eq!( + request.query, + Some(format!( + "query op{index}__b__0{{entryB(count:{REQUEST_COUNT}){{index}}}}", + )) + ); + } + + ResponseTemplate::new(200).set_body_json( + (0..REQUEST_COUNT) + .filter(|&i| i != 1) + .map(|index| { + serde_json::json!({ + "data": { + "entryB": { + "index": index + } + } + }) + }) + .collect::>(), + ) + } + + // Interleave requests so that we can verify that they get properly separated + // Have the B subgraph get all of its requests cancelled by a rhai script + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + RHAI_CONFIG, + &requests, + Some(helper::expect_batch), + Some(handle_b), + ) + .await?; + + if test_is_enabled() { + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryB: + index: 0 + - data: + entryA: + index: 1 + - errors: + - message: "rhai execution error: 'Runtime error: cancelled expected failure (line 5, position 13)\nin closure call'" + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_cancelled_by_coprocessor() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 2; + const COPROCESSOR_CONFIG: &str = include_str!("../fixtures/batching/coprocessor.router.yaml"); + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + + // Spin up a coprocessor for cancelling requests to A + let coprocessor = wiremock::MockServer::builder().start().await; + let subgraph_a_canceller = wiremock::Mock::given(wiremock::matchers::method("POST")) + .respond_with(|request: &wiremock::Request| { + let info: serde_json::Value = request.body_json().unwrap(); + let subgraph = info + .as_object() + .unwrap() + .get("serviceName") + .unwrap() + .as_string() + .unwrap(); + + // Pass through the request if the subgraph isn't 'A' + let response = if subgraph != "a" { + info + } else { + // Patch it otherwise to stop execution + let mut res = info; + let block = res.as_object_mut().unwrap(); + block.insert("control".to_string(), serde_json::json!({ "break": 403 })); + block.insert( + "body".to_string(), + serde_json::json!({ + "errors": [{ + "message": "Subgraph A is not allowed", + "extensions": { + "code": "ERR_NOT_ALLOWED", + }, + }], + }), + ); + + res + }; + ResponseTemplate::new(200).set_body_json(response) + }) + .named("coprocessor POST /"); + coprocessor.register(subgraph_a_canceller).await; + + // Make sure to patch the config with the coprocessor's port + let config = COPROCESSOR_CONFIG.replace("REPLACEME", &coprocessor.address().port().to_string()); + + // Interleave requests so that we can verify that they get properly separated + // Have the A subgraph get all of its requests cancelled by a coprocessor + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + config.as_str(), + &requests, + None::, + Some(helper::expect_batch), + ) + .await?; + + if test_is_enabled() { + assert_yaml_snapshot!(responses, @r###" + --- + - errors: + - message: Subgraph A is not allowed + extensions: + code: ERR_NOT_ALLOWED + - data: + entryB: + index: 0 + - errors: + - message: Subgraph A is not allowed + extensions: + code: ERR_NOT_ALLOWED + - data: + entryB: + index: 1 + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_single_request_cancelled_by_coprocessor() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 4; + const COPROCESSOR_CONFIG: &str = include_str!("../fixtures/batching/coprocessor.router.yaml"); + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + + // Spin up a coprocessor for cancelling requests to A + let coprocessor = wiremock::MockServer::builder().start().await; + let subgraph_a_canceller = wiremock::Mock::given(wiremock::matchers::method("POST")) + .respond_with(|request: &wiremock::Request| { + let info: serde_json::Value = request.body_json().unwrap(); + let subgraph = info + .as_object() + .unwrap() + .get("serviceName") + .unwrap() + .as_string() + .unwrap(); + let query = info + .as_object() + .unwrap() + .get("body") + .unwrap() + .as_object() + .unwrap() + .get("query") + .unwrap() + .as_string() + .unwrap(); + + // Cancel the request if we're in subgraph A, index 2 + let response = if subgraph == "a" && query.contains("op2") { + // Patch it to stop execution + let mut res = info; + let block = res.as_object_mut().unwrap(); + block.insert("control".to_string(), serde_json::json!({ "break": 403 })); + block.insert( + "body".to_string(), + serde_json::json!({ + "errors": [{ + "message": "Subgraph A index 2 is not allowed", + "extensions": { + "code": "ERR_NOT_ALLOWED", + }, + }], + }), + ); + + res + } else { + info + }; + ResponseTemplate::new(200).set_body_json(response) + }) + .named("coprocessor POST /"); + coprocessor.register(subgraph_a_canceller).await; + + // We aren't expecting the whole batch anymore, so we need a handler here for it + fn handle_a(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // We should have gotten all of the regular elements minus the third + assert_eq!(requests.len(), REQUEST_COUNT - 1); + + // Each element should have be for the specified subgraph and should have a field selection + // of index. The index should be 0..n without 2. + // Note: The router appends info to the query, so we append it at this check + for (request, index) in requests.into_iter().zip((0..).filter(|&i| i != 2)) { + assert_eq!( + request.query, + Some(format!( + "query op{index}__a__0{{entryA(count:{REQUEST_COUNT}){{index}}}}", + )) + ); + } + + ResponseTemplate::new(200).set_body_json( + (0..REQUEST_COUNT) + .filter(|&i| i != 2) + .map(|index| { + serde_json::json!({ + "data": { + "entryA": { + "index": index + } + } + }) + }) + .collect::>(), + ) + } + + // Make sure to patch the config with the coprocessor's port + let config = COPROCESSOR_CONFIG.replace("REPLACEME", &coprocessor.address().port().to_string()); + + // Interleave requests so that we can verify that they get properly separated + // Have the A subgraph get all of its requests cancelled by a coprocessor + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + config.as_str(), + &requests, + Some(handle_a), + Some(helper::expect_batch), + ) + .await?; + + if test_is_enabled() { + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryB: + index: 0 + - data: + entryA: + index: 1 + - data: + entryB: + index: 1 + - errors: + - message: Subgraph A index 2 is not allowed + extensions: + code: ERR_NOT_ALLOWED + - data: + entryB: + index: 2 + - data: + entryA: + index: 3 + - data: + entryB: + index: 3 + "###); + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_single_invalid_graphql() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 5; + + let mut requests: Vec<_> = (0..REQUEST_COUNT) + .map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }) + .collect(); + + // Mess up the 4th one + requests[3].query = Some("query op3".into()); + + // We aren't expecting the whole batch anymore, so we need a handler here for it + fn handle_a(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // We should have gotten all of the regular elements minus the third + assert_eq!(requests.len(), REQUEST_COUNT - 1); + + // Each element should have be for the specified subgraph and should have a field selection + // of index. The index should be 0..n without 3. + // Note: The router appends info to the query, so we append it at this check + for (request, index) in requests.into_iter().zip((0..).filter(|&i| i != 3)) { + assert_eq!( + request.query, + Some(format!( + "query op{index}__a__0{{entryA(count:{REQUEST_COUNT}){{index}}}}", + )) + ); + } + + ResponseTemplate::new(200).set_body_json( + (0..REQUEST_COUNT) + .filter(|&i| i != 3) + .map(|index| { + serde_json::json!({ + "data": { + "entryA": { + "index": index + } + } + }) + }) + .collect::>(), + ) + } + + let responses = helper::run_test( + CONFIG, + &requests[..], + Some(handle_a), + None::, + ) + .await?; + + if test_is_enabled() { + // Make sure that we got back what we wanted + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryA: + index: 1 + - data: + entryA: + index: 2 + - errors: + - message: "parsing error: syntax error: expected a Selection Set" + locations: + - line: 1 + column: 10 + extensions: + code: PARSING_ERROR + - data: + entryA: + index: 4 + "###); + } + + Ok(()) +} + +/// Utility methods for these tests +mod helper { + use std::time::Duration; + + use apollo_router::graphql::Request; + use apollo_router::graphql::Response; + use tower::BoxError; + use wiremock::matchers; + use wiremock::MockServer; + use wiremock::Respond; + use wiremock::ResponseTemplate; + + use super::test_is_enabled; + use crate::integration::common::IntegrationTest; + + /// Helper type for specifying a valid handler + pub type Handler = fn(&wiremock::Request) -> ResponseTemplate; + + /// Helper method for creating a wiremock handler from a handler + /// + /// If the handler is `None`, then the fallback is to always fail any request to the mock server + macro_rules! make_handler { + ($subgraph_path:expr, $handler:expr) => { + if let Some(f) = $handler { + wiremock::Mock::given(matchers::method("POST")) + .and(matchers::path($subgraph_path)) + .respond_with(f) + .expect(1) + .named(stringify!(batching POST $subgraph_path)) + } else { + wiremock::Mock::given(matchers::method("POST")) + .and(matchers::path($subgraph_path)) + .respond_with(always_fail) + .expect(0) + .named(stringify!(batching POST $subgraph_path)) + } + } + } + + /// Set up the integration test stack + pub async fn run_test( + config: &str, + requests: &[Request], + handler_a: Option, + handler_b: Option, + ) -> Result, BoxError> { + // Ensure that we have the test keys before running + // Note: The [IntegrationTest] ensures that these test credentials get + // set before running the router. + if !test_is_enabled() { + return Ok(Vec::new()); + }; + + // Create a wiremock server for each handler + let mock_server_a = MockServer::start().await; + let mock_server_b = MockServer::start().await; + mock_server_a.register(make_handler!("/a", handler_a)).await; + mock_server_b.register(make_handler!("/b", handler_b)).await; + + // Start up the router with the mocked subgraphs + let mut router = IntegrationTest::builder() + .config(config) + .supergraph("tests/fixtures/batching/schema.graphql") + .subgraph_override("a", format!("{}/a", mock_server_a.uri())) + .subgraph_override("b", format!("{}/b", mock_server_b.uri())) + .build() + .await; + + router.start().await; + router.assert_started().await; + + // Execute the request + let request = serde_json::to_value(requests)?; + let (_span, response) = router.execute_query(&request).await; + + serde_json::from_slice::>(&response.bytes().await?).map_err(BoxError::from) + } + + /// Subgraph handler for receiving a batch of requests + pub fn expect_batch(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // Extract info about this operation + let (subgraph, count): (String, usize) = { + let re = regex::Regex::new(r"entry([AB])\(count:([0-9]+)\)").unwrap(); + let captures = re.captures(requests[0].query.as_ref().unwrap()).unwrap(); + + (captures[1].to_string(), captures[2].parse().unwrap()) + }; + + // We should have gotten `count` elements + assert_eq!(requests.len(), count); + + // Each element should have be for the specified subgraph and should have a field selection + // of index. + // Note: The router appends info to the query, so we append it at this check + for (index, request) in requests.into_iter().enumerate() { + assert_eq!( + request.query, + Some(format!( + "query op{index}__{}__0{{entry{}(count:{count}){{index}}}}", + subgraph.to_lowercase(), + subgraph + )) + ); + } + + ResponseTemplate::new(200).set_body_json( + (0..count) + .map(|index| { + serde_json::json!({ + "data": { + format!("entry{subgraph}"): { + "index": index + } + } + }) + }) + .collect::>(), + ) + } + + /// Handler that always returns an error for the second batch field + pub fn fail_second_batch_request(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // Extract info about this operation + let (subgraph, count): (String, usize) = { + let re = regex::Regex::new(r"entry([AB])\(count:([0-9]+)\)").unwrap(); + let captures = re.captures(requests[0].query.as_ref().unwrap()).unwrap(); + + (captures[1].to_string(), captures[2].parse().unwrap()) + }; + + // We should have gotten `count` elements + assert_eq!(requests.len(), count); + + // Create the response with the second element as an error + let responses = { + let mut rs: Vec<_> = (0..count) + .map(|index| { + serde_json::json!({ + "data": { + format!("entry{subgraph}"): { + "index": index + } + } + }) + }) + .collect(); + + rs[1] = serde_json::json!({ "errors": [{ "message": format!("expected error in {subgraph}") }] }); + rs + }; + + // Respond with an error on the second element but valid data for the rest + ResponseTemplate::new(200).set_body_json(responses) + } + + /// Subgraph handler that delays indefinitely + /// + /// Useful for testing timeouts at the batch level + pub fn never_respond(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // Extract info about this operation + let (_, count): (String, usize) = { + let re = regex::Regex::new(r"entry([AB])\(count:([0-9]+)\)").unwrap(); + let captures = re.captures(requests[0].query.as_ref().unwrap()).unwrap(); + + (captures[1].to_string(), captures[2].parse().unwrap()) + }; + + // We should have gotten `count` elements + assert_eq!(requests.len(), count); + + // Respond as normal but with a long delay + ResponseTemplate::new(200).set_delay(Duration::from_secs(365 * 24 * 60 * 60)) + } + + /// Subgraph handler that always fails + /// + /// Useful for subgraphs tests that should never actually be called + fn always_fail(_request: &wiremock::Request) -> ResponseTemplate { + ResponseTemplate::new(400).set_body_json(serde_json::json!({ + "errors": [{ + "message": "called into subgraph that should not have happened", + }] + })) + } +} diff --git a/apollo-router/tests/integration/mod.rs b/apollo-router/tests/integration/mod.rs index 80ee7c18f5..97937bac53 100644 --- a/apollo-router/tests/integration/mod.rs +++ b/apollo-router/tests/integration/mod.rs @@ -1,3 +1,4 @@ +mod batching; #[path = "../common.rs"] pub(crate) mod common; pub(crate) use common::IntegrationTest; diff --git a/apollo-router/tests/integration/snapshots/integration_tests__integration__lifecycle__cli_config_experimental.snap b/apollo-router/tests/integration/snapshots/integration_tests__integration__lifecycle__cli_config_experimental.snap index e98c33cfce..928c8e8cb8 100644 --- a/apollo-router/tests/integration/snapshots/integration_tests__integration__lifecycle__cli_config_experimental.snap +++ b/apollo-router/tests/integration/snapshots/integration_tests__integration__lifecycle__cli_config_experimental.snap @@ -9,7 +9,6 @@ stderr: stdout: List of all experimental configurations with related GitHub discussions: - - experimental_batching: https://github.com/apollographql/router/discussions/3840 - experimental_response_trace_id: https://github.com/apollographql/router/discussions/2147 - experimental_retry: https://github.com/apollographql/router/discussions/2241 - experimental_when_header: https://github.com/apollographql/router/discussions/1961 diff --git a/docs/source/config.json b/docs/source/config.json index 1b247ec085..c4b9a039bd 100644 --- a/docs/source/config.json +++ b/docs/source/config.json @@ -79,7 +79,7 @@ "Query batching": [ "/executing-operations/query-batching", [ - "experimental" + "enterprise" ] ], "GraphQL Subscriptions": { diff --git a/docs/source/configuration/traffic-shaping.mdx b/docs/source/configuration/traffic-shaping.mdx index 25c2e0df98..df53d9c34c 100644 --- a/docs/source/configuration/traffic-shaping.mdx +++ b/docs/source/configuration/traffic-shaping.mdx @@ -66,7 +66,7 @@ You can change the default timeout for client requests to the router like so: ```yaml title="router.yaml" traffic_shaping: - router: + router: timeout: 50s # If client requests to the router take more than 50 seconds, cancel the request (30 seconds by default) ``` @@ -74,7 +74,7 @@ You can change the default timeout for all requests between the router and subgr ```yaml title="router.yaml" traffic_shaping: - all: + all: timeout: 50s # If subgraph requests take more than 50 seconds, cancel the request (30 seconds by default) ``` @@ -93,7 +93,7 @@ Compression is automatically supported on the client side, depending on the `Acc The Apollo Router has _experimental_ support for receiving client query batches: ```yaml title="router.yaml" -experimental_batching: +batching: enabled: true mode: batch_http_link ``` diff --git a/docs/source/executing-operations/query-batching.mdx b/docs/source/executing-operations/query-batching.mdx index 2fdebf80a5..e9b6ed9537 100644 --- a/docs/source/executing-operations/query-batching.mdx +++ b/docs/source/executing-operations/query-batching.mdx @@ -1,9 +1,9 @@ --- title: Query batching -description: Receive query batches with the Apollo Router +description: Receive query batches with the Apollo Router --- - + Learn about query batching and how to configure the Apollo Router to receive query batches. @@ -11,24 +11,36 @@ Learn about query batching and how to configure the Apollo Router to receive que Modern applications often require several requests to render a single page. This is usually the result of a component-based architecture where individual micro-frontends (MFE) make requests separately to fetch data relevant to them. Not only does this cause a performance overhead—different components may be requesting the same data—it can also cause a consistency issue. To combat this, MFE-based UIs batch multiple client operations, issued close together, into a single HTTP request. This is supported in Apollo Client and Apollo Server. -The Apollo Router supports client query batching. If you’re using Apollo Client, you can leverage the built-in support for batching to reduce the number of individual operations sent to the router. +The router's batching support is provided by two sets of functionality: + - client batching + - subgraph batching -Once configured, Apollo Client automatically combines multiple operations into a single HTTP request. The number of operations within a batch is client-configurable, including the maximum number in a batch and the maximum duration to wait for operations to accumulate before sending the batch. +With client batching, the router accepts batched requests from a client and processes each request of a batch separately. Consequently, the router doesn't present requests to subgraphs in batch form, so subgraphs must process the requests of a batch individually. -The Apollo Router must be configured to receive query batches, otherwise it rejects them. When processing a batch, the router deserializes and processes each operation of a batch independently, and it responds to the client only after all operations of the batch have been completed. Each operation executes concurrently with respect to other operations in the batch. +With subgraph batching, the router analyzes input client batch requests and issues batch requests to subgraphs. Subgraph batching is an extension to client batching and requires participating subgraphs to support batching requests. See the examples below to see illustrations of how this works in practice. -## Configure query batching +The Apollo Router supports client and subgraph query batching. + +If you’re using Apollo Client, you can leverage the built-in support for batching to reduce the number of individual operations sent to the router. + +Once configured, Apollo Client automatically combines multiple operations into a single HTTP request. The number of operations within a batch is client-configurable, including the maximum number in a batch and the maximum duration to wait for operations to accumulate before sending the batch. + +The Apollo Router must be configured to receive query batches, otherwise it rejects them. When processing a batch, the router deserializes and processes each operation of a batch independently. It responds to the client only after all operations of the batch have been completed. Each operation executes concurrently with respect to other operations in the batch. + +## Configure client query batching Both the Apollo Router and client need to be configured to support query batching. ### Configure router -By default, receiving client query batches is _not_ enabled in the Apollo Router. +#### Client query batching + +By default, receiving client query batches is _not_ enabled in the Apollo Router. To enable query batching, set the following fields in your `router.yaml` configuration file: ```yaml title="router.yaml" -experimental_batching: +batching: enabled: true mode: batch_http_link ``` @@ -38,6 +50,138 @@ experimental_batching: | `enabled` | Flag to enable reception of client query batches | boolean | `false` | | `mode` | Supported client batching mode | `batch_http_link`: the client uses Apollo Link and its [`BatchHttpLink`](/react/api/link/apollo-link-batch-http) link. | No Default | +#### Subgraph query batching + +If client query batching is enabled, and the router's subgraphs [support query batching](/apollo-server/api/apollo-server#allowbatchedhttprequests), then subgraph query batching can be enabled by setting the following fields in your `router.yaml` configuration file: + +```yaml title="router.all_enabled.yaml" +batching: + enabled: true + mode: batch_http_link + subgraph: + # Enable batching on all subgraphs + all: + enabled: true +``` + +```yaml title="router.yaml" +batching: + enabled: true + mode: batch_http_link + subgraph: + # Disable batching on all subgraphs + all: + enabled: false + # Configure(over-ride) batching support per subgraph + subgraphs: + subgraph_1: + enabled: true + subgraph_2: + enabled: true +``` + + + +- The router can be configured to support batching for either all subgraphs or individually enabled per subgraph. + +- There are limitations on the ability of the router to preserve batches from the client request into the subgraph requests. In particular, certain forms of queries will require data to be present before they are processed. Consequently, the router will only be able to generate batches from queries which are processed which don't contain such constraints. This may result in the router issuing multiple batches or requests. + +- If [query deduplication](../configuration/traffic-shaping/#query-deduplication) is enabled, it will not apply to batched queries. Batching will take precedence over query deduplication. Query deduplication will still be performed for non-batched queries. + + + +##### Example: simple subgraph batching + +This example shows how the router can batch subgraph requests in the most efficient scenario, where the queries of a batch don't have required fetch constraints. + +Assume the federated graph contains three subgraphs: `accounts`, `products`, and `reviews`. + +The input client query to the federated graph: + +```json title="simple-batch.json" +[ + {"query":"query MeQuery1 {\n me {\n id\n }\n}"} + {"query":"query MeQuery2 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery3 {\n me {\n id\n }\n}"} + {"query":"query MeQuery4 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery5 {\n me {\n id\n }\n}"} + {"query":"query MeQuery6 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery7 {\n me {\n id\n }\n}"} + {"query":"query MeQuery8 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery9 {\n me {\n id\n }\n}"} + {"query":"query MeQuery10 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery11 {\n me {\n id\n }\n}"} + {"query":"query MeQuery12 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery13 {\n me {\n id\n }\n}"} + {"query":"query MeQuery14 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery15 {\n me {\n id\n }\n}"} +] +``` + +From the input query, the router generates a set of subgraph queries: +``` +"query MeQuery1__accounts__0{me{id}}", +"query MeQuery2__accounts__0{me{name}}", +"query MeQuery3__accounts__0{me{id}}", +"query MeQuery4__accounts__0{me{name}}", +"query MeQuery5__accounts__0{me{id}}", +"query MeQuery6__accounts__0{me{name}}", +"query MeQuery7__accounts__0{me{id}}", +"query MeQuery8__accounts__0{me{name}}", +"query MeQuery9__accounts__0{me{id}}", +"query MeQuery10__accounts__0{me{name}}", +"query MeQuery11__accounts__0{me{id}}", +"query MeQuery12__accounts__0{me{name}}", +"query MeQuery13__accounts__0{me{id}}", +"query MeQuery14__accounts__0{me{name}}", +"query MeQuery15__accounts__0{me{id}}", +``` +All of the queries can be combined into a single batch. So instead of 15 (non-batch) subgraph fetches, the router only has to make one fetch. + +| Subgraph | Fetch Count (without)| Fetch Count (with) | +|----------|----------------------|--------------------| +| accounts | 15 | 1 | + +##### Example: complex subgraph batching + +This example shows how the router might batch subgraph requests for a graph, where the client batch contains a query for an entity. + +Assume the federated graph contains three subgraphs: `accounts`, `products`, and `reviews`. + +The input client query to the federated graph: + +```json title="federated-batch.json" +[ + {"query":"query MeQuery1 {\n me {\n id\n }\n}"}, + {"query":"query MeQuery2 {\n me {\n reviews {\n body\n }\n }\n}"}, + {"query":"query MeQuery3 {\n topProducts {\n upc\n reviews {\n author {\n name\n }\n }\n }\n me {\n name\n }\n}"}, + {"query":"query MeQuery4 {\n me {\n name\n }\n}"}, + {"query":"query MeQuery5 {\n me {\n id\n }\n}"} +] +``` + +From the input query, the router generates a set of subgraph queries: +``` +"query MeQuery1__accounts__0{me{id}}", +"query MeQuery2__accounts__0{me{__typename id}}", +"query MeQuery3__products__0{topProducts{__typename upc}}", +"query MeQuery3__accounts__3{me{name}}", +"query MeQuery4__accounts__0{me{name}}", +"query MeQuery5__accounts__0{me{id}}", +"query MeQuery2__reviews__1($representations:[_Any!]!){_entities(representations:$representations){...on User{reviews{body}}}}", +"query MeQuery3__reviews__1($representations:[_Any!]!){_entities(representations:$representations){...on Product{reviews{author{__typename id}}}}}", +"query MeQuery3__accounts__2($representations:[_Any!]!){_entities(representations:$representations){...on User{name}}}", +``` +The first six queries can be combined into two batches—one for `accounts` and one for `products`. They must be fetched before the final three queries can be executed individually. + +Overall, without subgraph batching, the router would make nine fetches in total across the three subgraphs, but with subgraph batching, that total is reduced to five fetches. + +| Subgraph | Fetch Count (without)| Fetch Count (with) | +|----------|----------------------|--------------------| +| accounts | 6 | 2 | +| products | 1 | 1 | +| reviews | 2 | 2 | + ### Configure client To enable batching in an Apollo client, configure `BatchHttpLink`. For details on implementing `BatchHttpLink`, see [batching operations](/react/api/link/apollo-link-batch-http/). @@ -69,11 +213,12 @@ Metrics in the Apollo Router for query batching: mode +[subgraph] -Counter for the number of received batches. +Counter for the number of received (from client) or dispatched (to subgraph) batches. @@ -87,6 +232,7 @@ Counter for the number of received batches. mode +[subgraph] @@ -98,6 +244,7 @@ Histogram for the size of received batches. +The `subgraph` attribute is optional. If the attribute isn't present, the metric identifies batches received from clients. If the attribute is present, the metric identifies batches sent to a particular subgraph. ## Query batch formats @@ -166,7 +313,7 @@ As a result, the router returns an invalid batch error: ### Individual query error -If a single query in a batch cannot be processed, this results in an individual error. +If a single query in a batch cannot be processed, this results in an individual error. For example, the query `MyFirstQuery` is accessing a field that doesn't exist, while the rest of the batch query is valid. @@ -203,7 +350,7 @@ As a result, an error is returned for the individual invalid query and the other ## Known limitations ### Unsupported query modes - + When batching is enabled, any batch operation that results in a stream of responses is unsupported, including: - [`@defer`](/graphos/operations/defer/) - [subscriptions](/graphos/operations/subscriptions/)