Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing hasNext behaviour #1745

Merged
merged 14 commits into from
Sep 14, 2022
7 changes: 0 additions & 7 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,6 @@ Executable::builder()
By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/1734

## 🐛 Fixes

### Set correctly hasNext for the last chunk of a deferred response ([#1687](https://github.com/apollographql/router/issues/1687))

You no longer will receive a last chunk `{"hasNext": false}` in a deferred response.

By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/1736

## 🛠 Maintenance

### Add errors vec in `QueryPlannerResponse` to handle errors in `query_planning_service` ([PR #1504](https://github.com/apollographql/router/pull/1504))
Expand Down
10 changes: 4 additions & 6 deletions apollo-router/src/query_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,6 @@ impl PlanNode {
let mut primary_receiver = primary_sender.subscribe();
let mut value = parent_value.clone();
let fut = async move {
let mut has_next = true;
let mut errors = Vec::new();

if is_depends_empty {
Expand Down Expand Up @@ -540,15 +539,13 @@ impl PlanNode {
if !is_depends_empty {
let primary_value =
primary_receiver.recv().await.unwrap_or_default();
has_next = false;
v.deep_merge(primary_value);
}

if let Err(e) = tx
.send(
Response::builder()
.data(v)
.has_next(has_next)
.errors(err)
.and_path(Some(deferred_path.clone()))
.and_subselection(subselection.or(node_subselection))
Expand All @@ -563,16 +560,16 @@ impl PlanNode {
e
);
};
tx.disconnect();
} else {
let primary_value =
primary_receiver.recv().await.unwrap_or_default();
has_next = false;
value.deep_merge(primary_value);

if let Err(e) = tx
.send(
Response::builder()
.data(value)
.has_next(has_next)
.errors(errors)
.and_path(Some(deferred_path.clone()))
.and_subselection(subselection)
Expand All @@ -587,6 +584,7 @@ impl PlanNode {
e
);
}
tx.disconnect();
};
};

Expand Down Expand Up @@ -1569,7 +1567,7 @@ mod tests {
serde_json::to_value(&response).unwrap(),
// the primary response appears there because the deferred response gets data from it
// unneeded parts are removed in response formatting
serde_json::json! {{"data":{"t":{"y":"Y","__typename":"T","id":1234,"x":"X"}}, "hasNext": false, "path":["t"]}}
serde_json::json! {{"data":{"t":{"y":"Y","__typename":"T","id":1234,"x":"X"}},"path":["t"]}}
);
}

Expand Down
77 changes: 73 additions & 4 deletions apollo-router/src/services/execution_service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! Implements the Execution phase of the request lifecycle.

use std::future::ready;
use std::sync::Arc;
use std::task::Poll;

use futures::future::ready;
use futures::channel::mpsc::Receiver;
use futures::channel::mpsc::SendError;
use futures::channel::mpsc::Sender;
use futures::future::BoxFuture;
use futures::stream::once;
use futures::SinkExt;
use futures::StreamExt;
use tower::BoxError;
use tower::ServiceBuilder;
Expand All @@ -17,6 +21,7 @@ use super::layers::allow_only_http_post_mutations::AllowOnlyHttpPostMutationsLay
use super::new_service::NewService;
use super::subgraph_service::SubgraphServiceFactory;
use super::Plugins;
use crate::graphql::Response;
use crate::services::execution;
use crate::ExecutionRequest;
use crate::ExecutionResponse;
Expand Down Expand Up @@ -66,9 +71,12 @@ where
)
.await;

let rest = receiver;

let stream = once(ready(first)).chain(rest).boxed();
let stream = if req.query_plan.root.contains_defer() {
println!("contains defer, will filter stream");
filter_stream(first, receiver).boxed()
} else {
once(ready(first)).chain(receiver).boxed()
};

Ok(ExecutionResponse::new_from_response(
http::Response::new(stream as _),
Expand All @@ -80,6 +88,67 @@ where
}
}

// modifies the response stream to set `has_next` to `false` on the last response
fn filter_stream(first: Response, mut stream: Receiver<Response>) -> Receiver<Response> {
let (mut sender, receiver) = futures::channel::mpsc::channel(10);

tokio::task::spawn(async move {
consume_responses(first, &mut stream, &mut sender).await?;

while let Some(current_response) = stream.next().await {
consume_responses(current_response, &mut stream, &mut sender).await?;
}
println!("done");
Ok::<_, SendError>(())
});

receiver
}

async fn consume_responses(
Copy link
Contributor

@BrynCooke BrynCooke Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be a race condition here.

In the case where stream.try_next() is called and returns an error as there may be more items in the stream, if the stream is then closed before the call to next in filter_stream, there will be no final empty response with has_next: false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_next is like doing next but without await: if there's a message in the stream, it will be returned by try_next, if not then try_next will return an error and the next call to next would wait for it.

In the case you are describing, try_next would not return an error if there are in flight messages. In the case where try_next would return an error, then somehow between returning from consume_responses and the call to next the stream gets new messages then is closed, then next would return a message, some calls to try_next would return messages, then when there's nothing remaining try_next would return Ok(None)`.

The one possible race I worry about is if messages are received and re-sent, then we await on next, then for whatever reason the stream is disconnected (maybe all the senders are dropped). Then we would need to add a final has_next = false response. But I don't see how this could play out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrynCooke that last cvase should be addressed by 5210765

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

mut current_response: Response,
stream: &mut Receiver<Response>,
sender: &mut Sender<Response>,
) -> Result<(), SendError> {
loop {
match stream.try_next() {
// no messages available, but the channel is not closed
// this means more deferred responses can come
Err(_) => {
println!("[{}]consume", line!());
sender.send(current_response).await?;

break;
}

// there might be other deferred responses after this one,
// so we should call `try_next` again
Ok(Some(response)) => {
println!("[{}]consume", line!());

sender.send(current_response).await?;
current_response = response;
}
// the channel is closed
// there will be no other deferred responses after that,
// so we set `has_next` to `false`
Ok(None) => {
println!("[{}]consume", line!());

current_response.has_next = Some(false);
println!(
"FILTER setting has next to false on {}",
serde_json::to_string(&current_response).unwrap()
);

sender.send(current_response).await?;
break;
}
}
}
Ok::<_, SendError>(())
}

pub(crate) trait ExecutionServiceFactory:
NewService<ExecutionRequest, Service = Self::ExecutionService> + Clone + Send + 'static
{
Expand Down
130 changes: 68 additions & 62 deletions apollo-router/src/services/supergraph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::sync::Arc;
use std::task::Poll;

use futures::future::ready;
use futures::future::BoxFuture;
use futures::stream::StreamExt;
use futures::TryFutureExt;
Expand Down Expand Up @@ -306,74 +307,79 @@ fn process_execution_response(
let ExecutionResponse { response, context } = execution_response;

let (parts, response_stream) = response.into_parts();
let stream = response_stream.map(move |mut response: Response| {
let has_next = response.has_next.unwrap_or(true);
tracing::debug_span!("format_response").in_scope(|| {
query.format_response(
&mut response,
operation_name.as_deref(),
variables.clone(),
schema.api_schema(),
)
});

match (response.path.as_ref(), response.data.as_ref()) {
(None, _) | (_, None) => {
if can_be_deferred {
response.has_next = Some(has_next);
}
let stream = response_stream
.map(move |mut response: Response| {
tracing::debug_span!("format_response").in_scope(|| {
query.format_response(
&mut response,
operation_name.as_deref(),
variables.clone(),
schema.api_schema(),
)
});

response
}
// if the deferred response specified a path, we must extract the
//values matched by that path and create a separate response for
//each of them.
// While { "data": { "a": { "b": 1 } } } and { "data": { "b": 1 }, "path: ["a"] }
// would merge in the same ways, some clients will generate code
// that checks the specific type of the deferred response at that
// path, instead of starting from the root object, so to support
// this, we extract the value at that path.
// In particular, that means that a deferred fragment in an object
// under an array would generate one response par array element
(Some(response_path), Some(response_data)) => {
let mut sub_responses = Vec::new();
response_data.select_values_and_paths(response_path, |path, value| {
sub_responses.push((path.clone(), value.clone()));
});

Response::builder()
.has_next(has_next)
.incremental(
sub_responses
.into_iter()
.map(move |(path, data)| {
IncrementalResponse::builder()
.and_label(response.label.clone())
.data(data)
.path(path)
.errors(response.errors.clone())
.extensions(response.extensions.clone())
.build()
})
.collect(),
)
.build()
match (response.path.as_ref(), response.data.as_ref()) {
(None, _) | (_, None) => {
if can_be_deferred && response.has_next.is_none() {
response.has_next = Some(true);
}

response
}
// if the deferred response specified a path, we must extract the
//values matched by that path and create a separate response for
//each of them.
// While { "data": { "a": { "b": 1 } } } and { "data": { "b": 1 }, "path: ["a"] }
// would merge in the same ways, some clients will generate code
// that checks the specific type of the deferred response at that
// path, instead of starting from the root object, so to support
// this, we extract the value at that path.
// In particular, that means that a deferred fragment in an object
// under an array would generate one response par array element
(Some(response_path), Some(response_data)) => {
let mut sub_responses = Vec::new();
response_data.select_values_and_paths(response_path, |path, value| {
sub_responses.push((path.clone(), value.clone()));
});

let has_next = response.has_next.unwrap_or(true);

Response::builder()
.has_next(has_next)
.incremental(
sub_responses
.into_iter()
.map(move |(path, data)| {
IncrementalResponse::builder()
.and_label(response.label.clone())
.data(data)
.path(path)
.errors(response.errors.clone())
.extensions(response.extensions.clone())
.build()
})
.collect(),
)
.build()
}
}
}
});
})
// avoid sending an empty deferred response if possible
.filter(|response| {
ready(
// this is a single response
response.data.is_some()
// the formatting step for incremental responses returned an empty array
|| !response.incremental.is_empty()
// even if the response is empty, we have to send a final response with `has_next` set to false
|| response.has_next == Some(false),
)
});

Ok(SupergraphResponse {
context,
response: http::Response::from_parts(
parts,
if can_be_deferred {
stream.left_stream()
} else {
stream.right_stream()
}
.in_current_span()
.boxed(),
),
response: http::Response::from_parts(parts, stream.in_current_span().boxed()),
})
}

Expand Down
4 changes: 4 additions & 0 deletions apollo-router/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,13 @@ async fn defer_path_in_array() {

let first = stream.next_response().await.unwrap();
insta::assert_json_snapshot!(first);
assert_eq!(first.has_next, Some(true));

let second = stream.next_response().await.unwrap();
insta::assert_json_snapshot!(second);
assert_eq!(second.has_next, Some(false));

assert_eq!(stream.next_response().await, None);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
source: apollo-router/tests/integration_tests.rs
assertion_line: 679
expression: second
---
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
source: apollo-router/tests/integration_tests.rs
assertion_line: 726
expression: second
---
{
Expand Down