Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch errors from the primary response to deferred responses #2192

Merged
merged 34 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8b8c8b9
split queries between primary and deferred parts
Nov 15, 2022
994dac2
handle named fragments
Nov 15, 2022
84e9d83
cleanup
Nov 16, 2022
5f993b9
do nnot add the defer directive to the generated query
Nov 16, 2022
597fbc8
logs
Nov 16, 2022
62a3fb7
Merge branch 'dev' into geal/split-queries
Dec 5, 2022
73c24fb
update router-bridge and apollo-encoder
Dec 5, 2022
405803c
handle queryPath in deferred nodes
Dec 5, 2022
02bdb78
split arrays
Dec 6, 2022
0364c60
Merge branch 'dev' into geal/split-queries
Dec 6, 2022
0563c78
missing snapshots
Dec 6, 2022
f903ee5
missing snapshot
Dec 6, 2022
ecb2dcb
lint
Dec 6, 2022
93a7ec2
handle arrays
Dec 6, 2022
9b222bf
remove unused code
Dec 6, 2022
606dcbd
add comments
Dec 6, 2022
cdf1b03
Merge branch 'dev' into geal/split-queries
Dec 6, 2022
3a1bc19
update router-bridge to fix CI builds
Dec 6, 2022
e794955
update router-bridge
Dec 12, 2022
223bc97
Merge branch 'dev' into geal/split-queries
Dec 12, 2022
20eeefe
Merge branch 'dev' into geal/split-queries
Dec 12, 2022
55e6ad9
filter the path for valueCompletion extensions
Dec 1, 2022
45c5ce8
dispatch errors from the primary query
Dec 1, 2022
ca55832
add a test for expected behaviour
Dec 1, 2022
68b9b18
we need to check that a path is handled by a query
Dec 2, 2022
d268507
lint
Dec 6, 2022
0fe7cbf
rename test
Dec 6, 2022
7c606d7
check that a query contains an error path
Dec 6, 2022
d564e36
changelog & lint
Dec 6, 2022
b2fb376
remove debug print
Dec 12, 2022
1744aa6
Merge branch 'dev' into geal/dispatch-errors-from-primary
Dec 12, 2022
38954e1
fix broken english
Dec 12, 2022
dce1978
Apply suggestions from code review
Dec 12, 2022
60a3b2e
This is a combination of 2 commits.
Dec 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p
When we drop Telemetry we spawn a thread to perform the global opentelemetry trace provider shutdown. The documentation of this function indicates that "This will invoke the shutdown method on all span processors. span processors should export remaining spans before return". We should give that process some time to complete (5 seconds currently) before returning from the `drop`. This will provide more opportunity for spans to be exported.

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/2191
### Dispatch errors from the primary response to deferred responses ([Issue #1818](https://github.com/apollographql/router/issues/1818), [Issue #2185](https://github.com/apollographql/router/issues/2185))

When errors are generated during the primary execution, some of them can be affected to
Geal marked this conversation as resolved.
Show resolved Hide resolved
deferred responses.

By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/pull/2192

## 🛠 Maintenance

Expand Down
3 changes: 2 additions & 1 deletion apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ reqwest = { version = "0.11.13", default-features = false, features = [
"json",
"stream",
] }
router-bridge = "0.1.11"
router-bridge = "0.1.12"
rust-embed="6.4.2"
schemars = { version = "0.8.11", features = ["url"] }
shellexpand = "2.1.2"
Expand Down Expand Up @@ -190,6 +190,7 @@ urlencoding = "2.1.2"
uuid = { version = "1.2.2", features = ["serde", "v4"] }
yaml-rust = "0.4.5"
askama = "0.11.1"
apollo-encoder = "0.4.0"

[target.'cfg(macos)'.dependencies]
uname = "0.1.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
source: apollo-router/src/axum_factory/tests.rs
expression: "json!([{\n \"data\" :\n {\n \"topProducts\" :\n [{ \"upc\" : \"1\", \"name\" : \"Table\", \"reviews\" : null },\n { \"upc\" : \"2\", \"name\" : \"Couch\", \"reviews\" : null }]\n }, \"errors\" :\n [{\n \"message\" :\n \"couldn't find mock for query {\\\"query\\\":\\\"query TopProducts__reviews__1($representations:[_Any!]!){_entities(representations:$representations){...on Product{reviews{__typename id product{__typename upc}}}}}\\\",\\\"operationName\\\":\\\"TopProducts__reviews__1\\\",\\\"variables\\\":{\\\"representations\\\":[{\\\"__typename\\\":\\\"Product\\\",\\\"upc\\\":\\\"1\\\"},{\\\"__typename\\\":\\\"Product\\\",\\\"upc\\\":\\\"2\\\"}]}}\"\n },\n {\n \"message\" :\n \"Subgraph response from 'reviews' was missing key `_entities`\",\n \"path\" : [\"topProducts\", \"@\"]\n }], \"hasNext\" : true,\n }, { \"hasNext\" : false }])"
---
[
{
"data": {
"topProducts": [
{
"upc": "1",
"name": "Table",
"reviews": null
},
{
"upc": "2",
"name": "Couch",
"reviews": null
}
]
},
"errors": [
{
"message": "couldn't find mock for query {\"query\":\"query TopProducts__reviews__1($representations:[_Any!]!){_entities(representations:$representations){...on Product{reviews{__typename id product{__typename upc}}}}}\",\"operationName\":\"TopProducts__reviews__1\",\"variables\":{\"representations\":[{\"__typename\":\"Product\",\"upc\":\"1\"},{\"__typename\":\"Product\",\"upc\":\"2\"}]}}"
},
{
"message": "Subgraph response from 'reviews' was missing key `_entities`",
"path": [
"topProducts",
"@"
]
}
],
"hasNext": true
},
{
"hasNext": false
}
]
26 changes: 1 addition & 25 deletions apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1923,31 +1923,7 @@ async fn test_defer_is_not_buffered() {

let (parts, counts): (Vec<_>, Vec<_>) = parts.map(|part| (part, counter.get())).unzip().await;
let parts = serde_json::Value::Array(parts);
assert_eq!(
parts,
json!([
{
"data": {
"topProducts": [
{"upc": "1", "name": "Table", "reviews": null},
{"upc": "2", "name": "Couch", "reviews": null}
]
},
"errors": [
{
"message": "couldn't find mock for query {\"query\":\"query TopProducts__reviews__1($representations:[_Any!]!){_entities(representations:$representations){...on Product{reviews{__typename id product{__typename upc}}}}}\",\"operationName\":\"TopProducts__reviews__1\",\"variables\":{\"representations\":[{\"__typename\":\"Product\",\"upc\":\"1\"},{\"__typename\":\"Product\",\"upc\":\"2\"}]}}"
},
{
"message": "Subgraph response from 'reviews' was missing key `_entities`",
"path": [ "topProducts", "@" ]
}],
"hasNext": true,
},
{"hasNext": false}
]),
"{}",
serde_json::to_string(&parts).unwrap()
);
insta::assert_json_snapshot!(parts);

// Non-regression test for https://github.com/apollographql/router/issues/1572
//
Expand Down
47 changes: 47 additions & 0 deletions apollo-router/src/json_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ impl ValueExt for Value {
.get_mut(k.as_str())
.expect("the value at that key was just inserted");
}
PathElement::Fragment(_) => {}
}
}

Expand Down Expand Up @@ -319,6 +320,7 @@ impl ValueExt for Value {
})
}
},
PathElement::Fragment(_) => {}
}
}

Expand Down Expand Up @@ -402,8 +404,17 @@ where
iterate_path(parent, &path[1..], value, f);
parent.pop();
}
} else if let Value::Array(array) = data {
for (i, value) in array.iter().enumerate() {
parent.push(PathElement::Index(i));
iterate_path(parent, path, value, f);
parent.pop();
}
}
}
Some(PathElement::Fragment(_)) => {
iterate_path(parent, &path[1..], data, f);
}
}
}

Expand All @@ -422,6 +433,10 @@ pub enum PathElement {
/// An index path element.
Index(usize),

/// A fragment application
#[serde(deserialize_with = "deserialize_fragment")]
Fragment(String),

/// A key path element.
Key(String),
}
Expand Down Expand Up @@ -464,6 +479,37 @@ where
serializer.serialize_str("@")
}

fn deserialize_fragment<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_str(FragmentVisitor)
}

struct FragmentVisitor;

impl<'de> serde::de::Visitor<'de> for FragmentVisitor {
type Value = String;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "a string that begins with '... '")
}

fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if s.starts_with("... ") {
Ok(s.to_string())
} else {
Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(s),
&self,
))
}
}
}

/// A path into the result document.
///
/// This can be composed of strings and numbers
Expand Down Expand Up @@ -587,6 +633,7 @@ impl fmt::Display for Path {
PathElement::Index(index) => write!(f, "{}", index)?,
PathElement::Key(key) => write!(f, "{}", key)?,
PathElement::Flatten => write!(f, "@")?,
PathElement::Fragment(fragment) => write!(f, "{fragment}")?,
}
}
Ok(())
Expand Down
24 changes: 16 additions & 8 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ impl PlanNode {
HashMap::new();
let mut futures = Vec::new();

let (primary_sender, _) = tokio::sync::broadcast::channel::<Value>(1);
let (primary_sender, _) =
tokio::sync::broadcast::channel::<(Value, Vec<Error>)>(1);

for deferred_node in deferred {
let fut = deferred_node
Expand Down Expand Up @@ -266,11 +267,12 @@ impl PlanNode {
errors.extend(err.into_iter());
subselection = primary_subselection.clone();

let _ = primary_sender.send(value.clone());
let _ = primary_sender.send((value.clone(), errors.clone()));
} else {
subselection = primary_subselection.clone();
let _ = primary_sender.send(value.clone());
let _ = primary_sender.send((value.clone(), errors.clone()));
}

}
.instrument(tracing::info_span!(
DEFER_SPAN_NAME,
Expand Down Expand Up @@ -357,7 +359,7 @@ impl DeferredNode {
parameters: &'a ExecutionParameters<'a, SF>,
parent_value: &Value,
sender: futures::channel::mpsc::Sender<Response>,
primary_sender: &Sender<Value>,
primary_sender: &Sender<(Value, Vec<Error>)>,
deferred_fetches: &mut HashMap<String, Sender<(Value, Vec<Error>)>>,
) -> impl Future<Output = ()>
where
Expand Down Expand Up @@ -389,7 +391,7 @@ impl DeferredNode {
let mut stream: stream::FuturesUnordered<_> = deferred_receivers.into_iter().collect();
//FIXME/ is there a solution without cloning the entire node? Maybe it could be moved instead?
let deferred_inner = self.node.clone();
let deferred_path = self.path.clone();
let deferred_path = self.query_path.clone();
let subselection = self.subselection();
let label = self.label.clone();
let mut tx = sender;
Expand All @@ -406,8 +408,10 @@ impl DeferredNode {
let mut errors = Vec::new();

if is_depends_empty {
let primary_value = primary_receiver.recv().await.unwrap_or_default();
let (primary_value, primary_errors) =
primary_receiver.recv().await.unwrap_or_default();
value.deep_merge(primary_value);
errors.extend(primary_errors.into_iter())
} else {
while let Some((v, _remaining)) = stream.next().await {
// a Err(RecvError) means either that the fetch was not performed and the
Expand Down Expand Up @@ -449,8 +453,10 @@ impl DeferredNode {
.await;

if !is_depends_empty {
let primary_value = primary_receiver.recv().await.unwrap_or_default();
let (primary_value, primary_errors) =
primary_receiver.recv().await.unwrap_or_default();
v.deep_merge(primary_value);
errors.extend(primary_errors.into_iter())
}

if let Err(e) = tx
Expand All @@ -473,8 +479,10 @@ impl DeferredNode {
};
tx.disconnect();
} else {
let primary_value = primary_receiver.recv().await.unwrap_or_default();
let (primary_value, primary_errors) =
primary_receiver.recv().await.unwrap_or_default();
value.deep_merge(primary_value);
errors.extend(primary_errors.into_iter());

if let Err(e) = tx
.send(
Expand Down
Loading