Skip to content

Commit

Permalink
wip #1687
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Coenen <[email protected]>
  • Loading branch information
bnjjj committed Sep 7, 2022
1 parent f0323e1 commit acb97e7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
13 changes: 8 additions & 5 deletions apollo-router/src/query_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,17 @@ impl PlanNode {
.await;

if !is_depends_empty {
let primary_value =
let mut primary_value =
primary_receiver.recv().await.unwrap_or_default();
primary_value
.deep_merge(serde_json_bytes::json!({"hasNext": false}));
v.deep_merge(primary_value);
}

if let Err(e) = tx
.send(
Response::builder()
.data(v)
.data(dbg!(v))
.errors(err)
.and_path(Some(deferred_path.clone()))
.and_subselection(subselection.or(node_subselection))
Expand All @@ -537,14 +539,15 @@ impl PlanNode {
);
};
} else {
let primary_value =
let mut primary_value =
primary_receiver.recv().await.unwrap_or_default();
primary_value
.deep_merge(serde_json_bytes::json!({"hasNext": false}));
value.deep_merge(primary_value);

if let Err(e) = tx
.send(
Response::builder()
.data(value)
.data(dbg!(value))
.errors(errors)
.and_path(Some(deferred_path.clone()))
.and_subselection(subselection)
Expand Down
26 changes: 18 additions & 8 deletions apollo-router/src/services/supergraph_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Implements the router phase of the request lifecycle.

use std::cell::RefCell;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Poll;

use futures::future::ready;
Expand Down Expand Up @@ -284,8 +286,13 @@ fn process_execution_response(
let ExecutionResponse { response, context } = execution_response;

let (parts, response_stream) = response.into_parts();

let stream = response_stream.map(move |mut response: Response| {
let has_next = response
.data
.as_ref()
.and_then(|data| data.get("hasNext"))
.and_then(|has_next| has_next.as_bool())
.unwrap_or(true);
tracing::debug_span!("format_response").in_scope(|| {
query.format_response(
&mut response,
Expand All @@ -298,10 +305,10 @@ fn process_execution_response(
match (response.path.as_ref(), response.data.as_ref()) {
(None, _) | (_, None) => {
if can_be_deferred {
response.has_next = Some(true);
response.has_next = Some(has_next);
}

response
dbg!(response)
}
// if the deferred response specified a path, we must extract the
//values matched by that path and create a separate response for
Expand All @@ -315,12 +322,17 @@ fn process_execution_response(
// under an array would generate one response par array element
(Some(response_path), Some(response_data)) => {
let mut sub_responses = Vec::new();
response_data.select_values_and_paths(response_path, |path, value| {
dbg!(response_data).select_values_and_paths(response_path, |path, value| {
sub_responses.push((path.clone(), value.clone()));
});
// let has_next = response_data
// .get("hasNext")
// .and_then(|has_next| has_next.as_bool())
// .unwrap_or(true);

Response::builder()
.has_next(true)
// TODO find a way to detect last element in a stream
.has_next(has_next)
.incremental(
sub_responses
.into_iter()
Expand All @@ -345,9 +357,7 @@ fn process_execution_response(
response: http::Response::from_parts(
parts,
if can_be_deferred {
stream
.chain(once(ready(Response::builder().has_next(false).build())))
.left_stream()
stream.left_stream()
} else {
stream.right_stream()
}
Expand Down

0 comments on commit acb97e7

Please sign in to comment.