diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 5de296ebed..269e075ac2 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -233,11 +233,14 @@ By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographq ## 🐛 Fixes -### Set correctly hasNext for the last chunk of a deferred response ([#1687](https://github.com/apollographql/router/issues/1687)) +### Set correctly hasNext for the last chunk of a deferred response ([#1687](https://github.com/apollographql/router/issues/1687) [#1745](https://github.com/apollographql/router/issues/1745)) -You no longer will receive a last chunk `{"hasNext": false}` in a deferred response. +There will no longer be an empty last response `{"hasNext": false}`, the `hasNext` field will be set on the +last deferred response. There can still be one edge case where that empty message can appear, if some +deferred queries were cancelled too quickly. -By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/1736 +By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/1687 +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/1745 ## 🛠 Maintenance diff --git a/apollo-router/src/query_planner/mod.rs b/apollo-router/src/query_planner/mod.rs index f1ef7e3c87..054b68a051 100644 --- a/apollo-router/src/query_planner/mod.rs +++ b/apollo-router/src/query_planner/mod.rs @@ -495,7 +495,6 @@ impl PlanNode { let mut primary_receiver = primary_sender.subscribe(); let mut value = parent_value.clone(); let fut = async move { - let mut has_next = true; let mut errors = Vec::new(); if is_depends_empty { @@ -540,7 +539,6 @@ impl PlanNode { if !is_depends_empty { let primary_value = primary_receiver.recv().await.unwrap_or_default(); - has_next = false; v.deep_merge(primary_value); } @@ -548,7 +546,6 @@ impl PlanNode { .send( Response::builder() .data(v) - .has_next(has_next) .errors(err) .and_path(Some(deferred_path.clone())) .and_subselection(subselection.or(node_subselection)) @@ -563,16 +560,16 @@ impl PlanNode { e ); }; + tx.disconnect(); } else { let primary_value = primary_receiver.recv().await.unwrap_or_default(); - has_next = false; value.deep_merge(primary_value); + if let Err(e) = tx .send( Response::builder() .data(value) - .has_next(has_next) .errors(errors) .and_path(Some(deferred_path.clone())) .and_subselection(subselection) @@ -587,6 +584,7 @@ impl PlanNode { e ); } + tx.disconnect(); }; }; @@ -1569,7 +1567,7 @@ mod tests { serde_json::to_value(&response).unwrap(), // the primary response appears there because the deferred response gets data from it // unneeded parts are removed in response formatting - serde_json::json! {{"data":{"t":{"y":"Y","__typename":"T","id":1234,"x":"X"}}, "hasNext": false, "path":["t"]}} + serde_json::json! {{"data":{"t":{"y":"Y","__typename":"T","id":1234,"x":"X"}},"path":["t"]}} ); } diff --git a/apollo-router/src/services/execution_service.rs b/apollo-router/src/services/execution_service.rs index 69c283ca3b..89a3f61ec5 100644 --- a/apollo-router/src/services/execution_service.rs +++ b/apollo-router/src/services/execution_service.rs @@ -1,11 +1,15 @@ //! Implements the Execution phase of the request lifecycle. +use std::future::ready; use std::sync::Arc; use std::task::Poll; -use futures::future::ready; +use futures::channel::mpsc::Receiver; +use futures::channel::mpsc::SendError; +use futures::channel::mpsc::Sender; use futures::future::BoxFuture; use futures::stream::once; +use futures::SinkExt; use futures::StreamExt; use tower::BoxError; use tower::ServiceBuilder; @@ -17,6 +21,7 @@ use super::layers::allow_only_http_post_mutations::AllowOnlyHttpPostMutationsLay use super::new_service::NewService; use super::subgraph_service::SubgraphServiceFactory; use super::Plugins; +use crate::graphql::Response; use crate::services::execution; use crate::ExecutionRequest; use crate::ExecutionResponse; @@ -66,9 +71,11 @@ where ) .await; - let rest = receiver; - - let stream = once(ready(first)).chain(rest).boxed(); + let stream = if req.query_plan.root.contains_defer() { + filter_stream(first, receiver).boxed() + } else { + once(ready(first)).chain(receiver).boxed() + }; Ok(ExecutionResponse::new_from_response( http::Response::new(stream as _), @@ -80,6 +87,66 @@ where } } +// modifies the response stream to set `has_next` to `false` on the last response +fn filter_stream(first: Response, mut stream: Receiver) -> Receiver { + let (mut sender, receiver) = futures::channel::mpsc::channel(10); + + tokio::task::spawn(async move { + let mut seen_last_message = consume_responses(first, &mut stream, &mut sender).await?; + + while let Some(current_response) = stream.next().await { + seen_last_message = + consume_responses(current_response, &mut stream, &mut sender).await?; + } + + // the response stream disconnected early so we could not add `has_next = false` to the + // last message, so we add an empty one + if !seen_last_message { + sender + .send(Response::builder().has_next(false).build()) + .await?; + } + Ok::<_, SendError>(()) + }); + + receiver +} + +// returns Ok(true) when we saw the last message +async fn consume_responses( + mut current_response: Response, + stream: &mut Receiver, + sender: &mut Sender, +) -> Result { + loop { + match stream.try_next() { + // no messages available, but the channel is not closed + // this means more deferred responses can come + Err(_) => { + sender.send(current_response).await?; + + return Ok(false); + } + + // there might be other deferred responses after this one, + // so we should call `try_next` again + Ok(Some(response)) => { + sender.send(current_response).await?; + current_response = response; + } + // the channel is closed + // there will be no other deferred responses after that, + // so we set `has_next` to `false` + Ok(None) => { + current_response.has_next = Some(false); + + sender.send(current_response).await?; + return Ok(true); + } + } + } +} + pub(crate) trait ExecutionServiceFactory: NewService + Clone + Send + 'static { diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index 402bafb6a8..f6e310df3c 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::task::Poll; +use futures::future::ready; use futures::future::BoxFuture; use futures::stream::StreamExt; use futures::TryFutureExt; @@ -306,74 +307,79 @@ fn process_execution_response( let ExecutionResponse { response, context } = execution_response; let (parts, response_stream) = response.into_parts(); - let stream = response_stream.map(move |mut response: Response| { - let has_next = response.has_next.unwrap_or(true); - tracing::debug_span!("format_response").in_scope(|| { - query.format_response( - &mut response, - operation_name.as_deref(), - variables.clone(), - schema.api_schema(), - ) - }); - match (response.path.as_ref(), response.data.as_ref()) { - (None, _) | (_, None) => { - if can_be_deferred { - response.has_next = Some(has_next); - } + let stream = response_stream + .map(move |mut response: Response| { + tracing::debug_span!("format_response").in_scope(|| { + query.format_response( + &mut response, + operation_name.as_deref(), + variables.clone(), + schema.api_schema(), + ) + }); - response - } - // if the deferred response specified a path, we must extract the - //values matched by that path and create a separate response for - //each of them. - // While { "data": { "a": { "b": 1 } } } and { "data": { "b": 1 }, "path: ["a"] } - // would merge in the same ways, some clients will generate code - // that checks the specific type of the deferred response at that - // path, instead of starting from the root object, so to support - // this, we extract the value at that path. - // In particular, that means that a deferred fragment in an object - // under an array would generate one response par array element - (Some(response_path), Some(response_data)) => { - let mut sub_responses = Vec::new(); - response_data.select_values_and_paths(response_path, |path, value| { - sub_responses.push((path.clone(), value.clone())); - }); - - Response::builder() - .has_next(has_next) - .incremental( - sub_responses - .into_iter() - .map(move |(path, data)| { - IncrementalResponse::builder() - .and_label(response.label.clone()) - .data(data) - .path(path) - .errors(response.errors.clone()) - .extensions(response.extensions.clone()) - .build() - }) - .collect(), - ) - .build() + match (response.path.as_ref(), response.data.as_ref()) { + (None, _) | (_, None) => { + if can_be_deferred && response.has_next.is_none() { + response.has_next = Some(true); + } + + response + } + // if the deferred response specified a path, we must extract the + //values matched by that path and create a separate response for + //each of them. + // While { "data": { "a": { "b": 1 } } } and { "data": { "b": 1 }, "path: ["a"] } + // would merge in the same ways, some clients will generate code + // that checks the specific type of the deferred response at that + // path, instead of starting from the root object, so to support + // this, we extract the value at that path. + // In particular, that means that a deferred fragment in an object + // under an array would generate one response par array element + (Some(response_path), Some(response_data)) => { + let mut sub_responses = Vec::new(); + response_data.select_values_and_paths(response_path, |path, value| { + sub_responses.push((path.clone(), value.clone())); + }); + + let has_next = response.has_next.unwrap_or(true); + + Response::builder() + .has_next(has_next) + .incremental( + sub_responses + .into_iter() + .map(move |(path, data)| { + IncrementalResponse::builder() + .and_label(response.label.clone()) + .data(data) + .path(path) + .errors(response.errors.clone()) + .extensions(response.extensions.clone()) + .build() + }) + .collect(), + ) + .build() + } } - } - }); + }) + // avoid sending an empty deferred response if possible + .filter(|response| { + ready( + // this is a single response + response.data.is_some() + // the formatting step for incremental responses returned an empty array + || !response.incremental.is_empty() + // even if the response is empty, we have to send a final response with `has_next` set to false + || response.has_next == Some(false), + ) + }); Ok(SupergraphResponse { context, - response: http::Response::from_parts( - parts, - if can_be_deferred { - stream.left_stream() - } else { - stream.right_stream() - } - .in_current_span() - .boxed(), - ), + response: http::Response::from_parts(parts, stream.in_current_span().boxed()), }) } diff --git a/apollo-router/tests/integration_tests.rs b/apollo-router/tests/integration_tests.rs index eda1f6381d..9e1db81fac 100644 --- a/apollo-router/tests/integration_tests.rs +++ b/apollo-router/tests/integration_tests.rs @@ -638,9 +638,13 @@ async fn defer_path_in_array() { let first = stream.next_response().await.unwrap(); insta::assert_json_snapshot!(first); + assert_eq!(first.has_next, Some(true)); let second = stream.next_response().await.unwrap(); insta::assert_json_snapshot!(second); + assert_eq!(second.has_next, Some(false)); + + assert_eq!(stream.next_response().await, None); } #[tokio::test(flavor = "multi_thread")] diff --git a/apollo-router/tests/snapshots/integration_tests__defer_path-2.snap b/apollo-router/tests/snapshots/integration_tests__defer_path-2.snap index e0ee201e4b..755c011e91 100644 --- a/apollo-router/tests/snapshots/integration_tests__defer_path-2.snap +++ b/apollo-router/tests/snapshots/integration_tests__defer_path-2.snap @@ -1,5 +1,6 @@ --- source: apollo-router/tests/integration_tests.rs +assertion_line: 679 expression: second --- { diff --git a/apollo-router/tests/snapshots/integration_tests__defer_path_in_array-2.snap b/apollo-router/tests/snapshots/integration_tests__defer_path_in_array-2.snap index b5272d0265..d7c5615564 100644 --- a/apollo-router/tests/snapshots/integration_tests__defer_path_in_array-2.snap +++ b/apollo-router/tests/snapshots/integration_tests__defer_path_in_array-2.snap @@ -1,5 +1,6 @@ --- source: apollo-router/tests/integration_tests.rs +assertion_line: 726 expression: second --- {