From 6353f2aa7a43d58fd1ee95b773d8ce18840e4781 Mon Sep 17 00:00:00 2001 From: Nicholas Cioli Date: Tue, 26 Mar 2024 18:20:32 -0400 Subject: [PATCH 1/6] Move batch query cancellation up This commit moves handling of cancelled batch queries into the router service so that any custom rust plugin, rhai script, coprocessor, etc. will have cancelled requests handled without needing to handle batches specially. --- apollo-router/src/batching.rs | 2 + .../src/plugins/coprocessor/execution.rs | 12 -- apollo-router/src/plugins/coprocessor/mod.rs | 23 --- .../src/plugins/coprocessor/supergraph.rs | 11 -- apollo-router/src/plugins/rhai/mod.rs | 12 +- apollo-router/src/services/router/service.rs | 37 ++++- .../fixtures/batching/coprocessor.router.yaml | 1 + apollo-router/tests/integration/batching.rs | 143 ++++++++++++++++++ 8 files changed, 183 insertions(+), 58 deletions(-) diff --git a/apollo-router/src/batching.rs b/apollo-router/src/batching.rs index b0532cbbeb..9cfdbfa830 100644 --- a/apollo-router/src/batching.rs +++ b/apollo-router/src/batching.rs @@ -133,6 +133,8 @@ impl BatchQuery { if self.remaining == 0 { self.sender = None; } + } else { + tracing::warn!("attempted to cancel completed batch query"); } } } diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index 6f97a1b2b6..7eb503659f 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -12,7 +12,6 @@ use tower_service::Service; use super::externalize_header_map; use super::*; -use crate::batching::BatchQuery; use crate::graphql; use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::layers::ServiceBuilderExt; @@ -291,17 +290,6 @@ where execution_response }; - // Handle cancelled batch queries - // FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can - // automatically work with batched queries and cancellations. - let batch_query_opt = res.context.extensions().lock().remove::(); - if let Some(mut batch_query) = batch_query_opt { - // TODO: How do we reliably get the reason for the coprocessor cancellation here? - batch_query - .signal_cancelled("coprocessor cancelled request at execution layer".to_string()) - .await; - } - return Ok(ControlFlow::Break(res)); } diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index 0a3164dc79..ee84460a41 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -30,7 +30,6 @@ use tower::Service; use tower::ServiceBuilder; use tower::ServiceExt; -use crate::batching::BatchQuery; use crate::error::Error; use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::layers::ServiceBuilderExt; @@ -686,17 +685,6 @@ where } } - // Handle cancelled batch queries - // FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can - // automatically work with batched queries and cancellations. - let batch_query_opt = res.context.extensions().lock().remove::(); - if let Some(mut batch_query) = batch_query_opt { - // TODO: How do we reliably get the reason for the coprocessor cancellation here? - batch_query - .signal_cancelled("coprocessor cancelled request at router layer".to_string()) - .await; - } - return Ok(ControlFlow::Break(res)); } @@ -1027,17 +1015,6 @@ where subgraph_response }; - // Handle cancelled batch queries - // FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can - // automatically work with batched queries and cancellations. - let batch_query_opt = res.context.extensions().lock().remove::(); - if let Some(mut batch_query) = batch_query_opt { - // TODO: How do we reliably get the reason for the coprocessor cancellation here? - batch_query - .signal_cancelled("coprocessor cancelled request at subgraph layer".to_string()) - .await; - } - return Ok(ControlFlow::Break(res)); } diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 172062726d..32033c1c3a 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -280,17 +280,6 @@ where supergraph_response }; - // Handle cancelled batch queries - // FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can - // automatically work with batched queries and cancellations. - let batch_query_opt = res.context.extensions().lock().remove::(); - if let Some(mut batch_query) = batch_query_opt { - // TODO: How do we reliably get the reason for the coprocessor cancellation here? - batch_query - .signal_cancelled("coprocessor cancelled request at supergraph layer".to_string()) - .await; - } - return Ok(ControlFlow::Break(res)); } diff --git a/apollo-router/src/plugins/rhai/mod.rs b/apollo-router/src/plugins/rhai/mod.rs index 234f5b8853..c0a8646ebe 100644 --- a/apollo-router/src/plugins/rhai/mod.rs +++ b/apollo-router/src/plugins/rhai/mod.rs @@ -339,17 +339,7 @@ macro_rules! gen_map_request { let mut guard = shared_request.lock().unwrap(); let request_opt = guard.take(); - // FIXME: Catch this error higher up the chain - let context = request_opt.unwrap().context; - if let Some(mut batch_query) = - context.extensions().lock().remove::() - { - let send_fut = - batch_query.signal_cancelled("cancelled by rhai".to_string()); - futures::executor::block_on(send_fut); - } - - return $base::request_failure(context, error_details); + return $base::request_failure(request_opt.unwrap().context, error_details); } let mut guard = shared_request.lock().unwrap(); let request_opt = guard.take(); diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 74169add5c..0a8eb88372 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -37,6 +37,7 @@ use tracing::Instrument; use super::ClientRequestAccepts; use crate::axum_factory::CanceledRequest; use crate::batching::Batch; +use crate::batching::BatchQuery; use crate::cache::DeduplicatingCache; use crate::configuration::Batching; use crate::configuration::BatchingMode; @@ -426,12 +427,46 @@ impl RouterService { } }; + // We need to handle cases where a failure is part of a batch and thus must be cancelled. + // Requests can be cancelled at any point of the router pipeline, but all failures bubble back + // up through here, so we can catch them without having to specially handle batch queries in + // other portions of the codebase. let futures = supergraph_requests .into_iter() - .map(|supergraph_request| self.process_supergraph_request(supergraph_request)); + .map(|supergraph_request| async { + // TODO: Cloning here seems to keep around the batch queries that should have been removed by the subgraph + // handler for the happy path, but `process_supergraph_request` takes ownership of the original request (and + // thus context) so we can't grab it from there. + // This will cause warnings about trying to cancel already completed batch queries in the logs for all + // batch queries. + let context = supergraph_request.context.clone(); + let result = self.process_supergraph_request(supergraph_request).await; + + // Regardless of the result, we need to make sure that we cancel any potential batch queries. This is because + // custom rust plugins, rhai scripts, and coprocessors can cancel requests at any time and return a GraphQL + // error wrapped in an `Ok` or in a `BoxError` wrapped in an `Err`. + let batch_query_opt = context.extensions().lock().remove::(); + if let Some(mut batch_query) = batch_query_opt { + // Grab the reason from the result, keeping in mind that even an `Ok` here could mean that + // the query was cancelled. + let reason = match &result { + Ok(res) => "todo!".into(), + Err(e) => e.to_string(), + }; + + tracing::info!( + "cancelling dangling batch query in supergraph response with: {reason}" + ); + batch_query.signal_cancelled(reason).await; + } + + result + }); // Use join_all to preserve ordering of concurrent operations // (Short circuit processing and propagate any errors in the batch) + // Note: We use `join_all` here since it awaits all futures before returning, thus allowing us to + // handle cancellation logic without fear of the other futures getting killed. let mut results: Vec = join_all(futures) .await .into_iter() diff --git a/apollo-router/tests/fixtures/batching/coprocessor.router.yaml b/apollo-router/tests/fixtures/batching/coprocessor.router.yaml index 8f7d30653a..7292662239 100644 --- a/apollo-router/tests/fixtures/batching/coprocessor.router.yaml +++ b/apollo-router/tests/fixtures/batching/coprocessor.router.yaml @@ -13,6 +13,7 @@ coprocessor: all: request: service_name: true + body: true include_subgraph_errors: all: true diff --git a/apollo-router/tests/integration/batching.rs b/apollo-router/tests/integration/batching.rs index beae021d97..688236bc51 100644 --- a/apollo-router/tests/integration/batching.rs +++ b/apollo-router/tests/integration/batching.rs @@ -569,6 +569,149 @@ async fn it_handles_cancelled_by_coprocessor() -> Result<(), BoxError> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_single_request_cancelled_by_coprocessor() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 4; + const COPROCESSOR_CONFIG: &str = include_str!("../fixtures/batching/coprocessor.router.yaml"); + + let requests_a = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + let requests_b = (0..REQUEST_COUNT).map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryB(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }); + + // Spin up a coprocessor for cancelling requests to A + let coprocessor = wiremock::MockServer::builder().start().await; + let subgraph_a_canceller = wiremock::Mock::given(wiremock::matchers::method("POST")) + .respond_with(|request: &wiremock::Request| { + let info: serde_json::Value = request.body_json().unwrap(); + let subgraph = info + .as_object() + .unwrap() + .get("serviceName") + .unwrap() + .as_string() + .unwrap(); + let query = info.as_object().unwrap().get("body").unwrap().as_object().unwrap().get("query").unwrap().as_string().unwrap(); + + // Cancel the request if we're in subgraph A, index 2 + let response = if subgraph == "a" && query.contains("op2") { + // Patch it to stop execution + let mut res = info; + let block = res.as_object_mut().unwrap(); + block.insert("control".to_string(), serde_json::json!({ "break": 403 })); + block.insert( + "body".to_string(), + serde_json::json!({ + "errors": [{ + "message": "Subgraph A index 2 is not allowed", + "extensions": { + "code": "ERR_NOT_ALLOWED", + }, + }], + }), + ); + + res + } else { + info + }; + ResponseTemplate::new(200).set_body_json(response) + }) + .named("coprocessor POST /"); + coprocessor.register(subgraph_a_canceller).await; + + // We aren't expecting the whole batch anymore, so we need a handler here for it + fn handle_a(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // We should have gotten all of the regular elements minus the third + assert_eq!(requests.len(), REQUEST_COUNT - 1); + + // Each element should have be for the specified subgraph and should have a field selection + // of index. The index should be 0..n without 2. + // Note: The router appends info to the query, so we append it at this check + for (request, index) in requests.into_iter().zip((0..).filter(|&i| i != 2)) { + assert_eq!( + request.query, + Some(format!( + "query op{index}__a__0{{entryA(count:{REQUEST_COUNT}){{index}}}}", + )) + ); + } + + ResponseTemplate::new(200).set_body_json( + (0..REQUEST_COUNT) + .filter(|&i| i != 2) + .map(|index| { + serde_json::json!({ + "data": { + "entryA": { + "index": index + } + } + }) + }) + .collect::>(), + ) + } + + // Make sure to patch the config with the coprocessor's port + let config = COPROCESSOR_CONFIG.replace("REPLACEME", &coprocessor.address().port().to_string()); + + // Interleave requests so that we can verify that they get properly separated + // Have the A subgraph get all of its requests cancelled by a coprocessor + let requests: Vec<_> = requests_a.interleave(requests_b).collect(); + let responses = helper::run_test( + config.as_str(), + &requests, + Some(handle_a), + Some(helper::expect_batch), + ) + .await?; + + // TODO: Fill this in once we know how this response should look + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryB: + index: 0 + - data: + entryA: + index: 1 + - data: + entryB: + index: 1 + - errors: + - message: Subgraph A index 2 is not allowed + extensions: + code: ERR_NOT_ALLOWED + - data: + entryB: + index: 2 + - data: + entryA: + index: 3 + - data: + entryB: + index: 3 + "###); + + Ok(()) +} + /// Utility methods for these tests mod helper { use std::time::Duration; From 200831eebb16505fe8183bee8a4aa2650dbeed07 Mon Sep 17 00:00:00 2001 From: Nicholas Cioli Date: Tue, 26 Mar 2024 18:27:24 -0400 Subject: [PATCH 2/6] fix clippy errors --- apollo-router/src/plugins/rhai/mod.rs | 1 - apollo-router/src/services/router/service.rs | 2 +- apollo-router/tests/integration/batching.rs | 12 +++++++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/apollo-router/src/plugins/rhai/mod.rs b/apollo-router/src/plugins/rhai/mod.rs index c0a8646ebe..069cc39980 100644 --- a/apollo-router/src/plugins/rhai/mod.rs +++ b/apollo-router/src/plugins/rhai/mod.rs @@ -40,7 +40,6 @@ use tower::ServiceExt; use self::engine::RhaiService; use self::engine::SharedMut; -use crate::batching::BatchQuery; use crate::error::Error; use crate::layers::ServiceBuilderExt; use crate::plugin::Plugin; diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 0a8eb88372..513edd62a8 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -450,7 +450,7 @@ impl RouterService { // Grab the reason from the result, keeping in mind that even an `Ok` here could mean that // the query was cancelled. let reason = match &result { - Ok(res) => "todo!".into(), + Ok(_res) => "todo!".into(), Err(e) => e.to_string(), }; diff --git a/apollo-router/tests/integration/batching.rs b/apollo-router/tests/integration/batching.rs index 688236bc51..9fba4ce363 100644 --- a/apollo-router/tests/integration/batching.rs +++ b/apollo-router/tests/integration/batching.rs @@ -601,7 +601,17 @@ async fn it_handles_single_request_cancelled_by_coprocessor() -> Result<(), BoxE .unwrap() .as_string() .unwrap(); - let query = info.as_object().unwrap().get("body").unwrap().as_object().unwrap().get("query").unwrap().as_string().unwrap(); + let query = info + .as_object() + .unwrap() + .get("body") + .unwrap() + .as_object() + .unwrap() + .get("query") + .unwrap() + .as_string() + .unwrap(); // Cancel the request if we're in subgraph A, index 2 let response = if subgraph == "a" && query.contains("op2") { From d3a47da334947c6d627e147447648ca18e996ebd Mon Sep 17 00:00:00 2001 From: Gary Pennington Date: Wed, 27 Mar 2024 15:23:43 +0000 Subject: [PATCH 3/6] [batching] tweak single cancellation point changes (#4868) These tweaks are enough (I think) for us to make progress. Let's review together and then decide how to proceed. --- apollo-router/src/batching.rs | 37 ++++++++++++-------- apollo-router/src/services/router/service.rs | 25 +++++-------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/apollo-router/src/batching.rs b/apollo-router/src/batching.rs index 9cfdbfa830..0a9cfe25bf 100644 --- a/apollo-router/src/batching.rs +++ b/apollo-router/src/batching.rs @@ -68,13 +68,13 @@ impl BatchQuery { // TODO: How should we handle the sender dying? self.sender .as_ref() - .unwrap() + .expect("set query hashes has a sender") .send(BatchHandlerMessage::Begin { index: self.index, query_hashes, }) .await - .unwrap(); + .expect("set query hashes could send"); } /// Signal to the batch handler that this specific batch query has made some progress. @@ -94,7 +94,7 @@ impl BatchQuery { // TODO: How should we handle the sender dying? self.sender .as_ref() - .unwrap() + .expect("signal progress has a sender") .send(BatchHandlerMessage::Progress { index: self.index, client_factory, @@ -104,7 +104,7 @@ impl BatchQuery { span_context: Span::current().context(), }) .await - .unwrap(); + .expect("signal progress could send"); self.remaining -= 1; if self.remaining == 0 { @@ -121,13 +121,13 @@ impl BatchQuery { if self.sender.is_some() { self.sender .as_ref() - .unwrap() + .expect("signal cancelled has a sender") .send(BatchHandlerMessage::Cancel { index: self.index, reason, }) .await - .unwrap(); + .expect("signal cancelled could send"); self.remaining -= 1; if self.remaining == 0 { @@ -253,10 +253,12 @@ impl Batch { { sender .send(Err(Box::new(FetchError::SubrequestBatchingError { - service: request.subgraph_name.unwrap(), + service: request + .subgraph_name + .expect("request has a subgraph_name"), reason: format!("request cancelled: {reason}"), }))) - .unwrap(); + .expect("batcher could send request cancelled to waiter"); } // Clear out everything that has committed, now that they are cancelled, and @@ -338,7 +340,12 @@ impl Batch { } in all_in_one { let value = svc_map - .entry(sg_request.subgraph_name.clone().unwrap()) + .entry( + sg_request + .subgraph_name + .clone() + .expect("request has a subgraph_name"), + ) .or_default(); value.push(BatchQueryInfo { request: sg_request, @@ -348,11 +355,13 @@ impl Batch { } // tracing::debug!("svc_map: {svc_map:?}"); - process_batches(master_client_factory.unwrap(), svc_map) - .await - .expect("XXX NEEDS TO WORK FOR NOW"); - } - .instrument(tracing::info_span!("batch_request", size))); + // If we don't have a master_client_factory, we can't do anything. + if let Some(client_factory) = master_client_factory { + process_batches(client_factory, svc_map) + .await + .expect("XXX NEEDS TO WORK FOR NOW"); + } + }); Self { senders: Mutex::new(senders), diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 513edd62a8..ac12b8bb24 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -434,11 +434,8 @@ impl RouterService { let futures = supergraph_requests .into_iter() .map(|supergraph_request| async { - // TODO: Cloning here seems to keep around the batch queries that should have been removed by the subgraph - // handler for the happy path, but `process_supergraph_request` takes ownership of the original request (and - // thus context) so we can't grab it from there. - // This will cause warnings about trying to cancel already completed batch queries in the logs for all - // batch queries. + // TODO: We clone the context here, because if the request results in an Err, the + // response context will no longer exist. let context = supergraph_request.context.clone(); let result = self.process_supergraph_request(supergraph_request).await; @@ -447,17 +444,13 @@ impl RouterService { // error wrapped in an `Ok` or in a `BoxError` wrapped in an `Err`. let batch_query_opt = context.extensions().lock().remove::(); if let Some(mut batch_query) = batch_query_opt { - // Grab the reason from the result, keeping in mind that even an `Ok` here could mean that - // the query was cancelled. - let reason = match &result { - Ok(_res) => "todo!".into(), - Err(e) => e.to_string(), - }; - - tracing::info!( - "cancelling dangling batch query in supergraph response with: {reason}" - ); - batch_query.signal_cancelled(reason).await; + // Only proceed with signalling cancelled if the batch_query is not finished + if !batch_query.finished() { + tracing::info!("cancelling batch query in supergraph response"); + batch_query + .signal_cancelled("request terminated by user".to_string()) + .await; + } } result From 6c057ab4f18bf1357d7a500bacbac68f1e0cf6d8 Mon Sep 17 00:00:00 2001 From: Nicholas Cioli Date: Wed, 27 Mar 2024 12:48:58 -0400 Subject: [PATCH 4/6] add test for invalid query --- apollo-router/tests/integration/batching.rs | 84 +++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/apollo-router/tests/integration/batching.rs b/apollo-router/tests/integration/batching.rs index 9fba4ce363..d6c6129846 100644 --- a/apollo-router/tests/integration/batching.rs +++ b/apollo-router/tests/integration/batching.rs @@ -722,6 +722,90 @@ async fn it_handles_single_request_cancelled_by_coprocessor() -> Result<(), BoxE Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn it_handles_single_invalid_graphql() -> Result<(), BoxError> { + const REQUEST_COUNT: usize = 5; + + let mut requests: Vec<_> = (0..REQUEST_COUNT) + .map(|index| { + Request::fake_builder() + .query(format!( + "query op{index}{{ entryA(count: {REQUEST_COUNT}) {{ index }} }}" + )) + .build() + }) + .collect(); + + // Mess up the 4th one + requests[3].query = Some("query op3".into()); + + // We aren't expecting the whole batch anymore, so we need a handler here for it + fn handle_a(request: &wiremock::Request) -> ResponseTemplate { + let requests: Vec = request.body_json().unwrap(); + + // We should have gotten all of the regular elements minus the third + assert_eq!(requests.len(), REQUEST_COUNT - 1); + + // Each element should have be for the specified subgraph and should have a field selection + // of index. The index should be 0..n without 3. + // Note: The router appends info to the query, so we append it at this check + for (request, index) in requests.into_iter().zip((0..).filter(|&i| i != 3)) { + assert_eq!( + request.query, + Some(format!( + "query op{index}__a__0{{entryA(count:{REQUEST_COUNT}){{index}}}}", + )) + ); + } + + ResponseTemplate::new(200).set_body_json( + (0..REQUEST_COUNT) + .filter(|&i| i != 3) + .map(|index| { + serde_json::json!({ + "data": { + "entryA": { + "index": index + } + } + }) + }) + .collect::>(), + ) + } + + let responses = helper::run_test( + CONFIG, + &requests[..], + Some(handle_a), + None::, + ) + .await?; + + // Make sure that we got back what we wanted + assert_yaml_snapshot!(responses, @r###" + --- + - data: + entryA: + index: 0 + - data: + entryA: + index: 1 + - data: + entryA: + index: 2 + - errors: + - message: "parsing error: Error: syntax error: expected a Selection Set\n ╭─[query.graphql:1:10]\n │\n 1 │ query op3\n │ │ \n │ ╰─ expected a Selection Set\n───╯\n" + extensions: + code: PARSING_ERROR + - data: + entryA: + index: 4 + "###); + + Ok(()) +} + /// Utility methods for these tests mod helper { use std::time::Duration; From c8d3d7a4e18c4866d688727cb36d00dc8571ebe2 Mon Sep 17 00:00:00 2001 From: Nicholas Cioli Date: Wed, 27 Mar 2024 13:11:41 -0400 Subject: [PATCH 5/6] fix clippy errors --- apollo-router/src/batching.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/apollo-router/src/batching.rs b/apollo-router/src/batching.rs index 0a9cfe25bf..8b236c85ea 100644 --- a/apollo-router/src/batching.rs +++ b/apollo-router/src/batching.rs @@ -17,7 +17,6 @@ use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::task::JoinHandle; use tower::BoxError; -use tracing::Instrument; use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; From ae3ed098f7fad607525c865027cf33eaba4463b8 Mon Sep 17 00:00:00 2001 From: Nicholas Cioli Date: Wed, 27 Mar 2024 13:21:23 -0400 Subject: [PATCH 6/6] readd missing instrumentation --- apollo-router/src/batching.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apollo-router/src/batching.rs b/apollo-router/src/batching.rs index 8b236c85ea..39c8a5b330 100644 --- a/apollo-router/src/batching.rs +++ b/apollo-router/src/batching.rs @@ -17,6 +17,7 @@ use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::task::JoinHandle; use tower::BoxError; +use tracing::Instrument; use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -360,7 +361,7 @@ impl Batch { .await .expect("XXX NEEDS TO WORK FOR NOW"); } - }); + }.instrument(tracing::info_span!("batch_request", size))); Self { senders: Mutex::new(senders),