diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 71a69e9234..97addb1bea 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -125,6 +125,12 @@ By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/p When we drop Telemetry we spawn a thread to perform the global opentelemetry trace provider shutdown. The documentation of this function indicates that "This will invoke the shutdown method on all span processors. span processors should export remaining spans before return". We should give that process some time to complete (5 seconds currently) before returning from the `drop`. This will provide more opportunity for spans to be exported. By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/2191 +### Dispatch errors from the primary response to deferred responses ([Issue #1818](https://github.com/apollographql/router/issues/1818), [Issue #2185](https://github.com/apollographql/router/issues/2185)) + +When errors are generated during the primary execution, some of them can be assigned to +deferred responses. + +By [@Geal](https://github.com/geal) in https://github.com/apollographql/router/pull/2192 ### Reconstruct deferred queries with knowledge about fragments ([Issue #2105](https://github.com/apollographql/router/issues/2105)) diff --git a/apollo-router/src/query_planner/execution.rs b/apollo-router/src/query_planner/execution.rs index c090f765ff..14f05fd764 100644 --- a/apollo-router/src/query_planner/execution.rs +++ b/apollo-router/src/query_planner/execution.rs @@ -221,7 +221,8 @@ impl PlanNode { HashMap::new(); let mut futures = Vec::new(); - let (primary_sender, _) = tokio::sync::broadcast::channel::(1); + let (primary_sender, _) = + tokio::sync::broadcast::channel::<(Value, Vec)>(1); for deferred_node in deferred { let fut = deferred_node @@ -266,10 +267,10 @@ impl PlanNode { errors.extend(err.into_iter()); subselection = primary_subselection.clone(); - let _ = primary_sender.send(value.clone()); + let _ = primary_sender.send((value.clone(), errors.clone())); } else { subselection = primary_subselection.clone(); - let _ = primary_sender.send(value.clone()); + let _ = primary_sender.send((value.clone(), errors.clone())); } } .instrument(tracing::info_span!( @@ -357,7 +358,7 @@ impl DeferredNode { parameters: &'a ExecutionParameters<'a, SF>, parent_value: &Value, sender: futures::channel::mpsc::Sender, - primary_sender: &Sender, + primary_sender: &Sender<(Value, Vec)>, deferred_fetches: &mut HashMap)>>, ) -> impl Future where @@ -406,8 +407,10 @@ impl DeferredNode { let mut errors = Vec::new(); if is_depends_empty { - let primary_value = primary_receiver.recv().await.unwrap_or_default(); + let (primary_value, primary_errors) = + primary_receiver.recv().await.unwrap_or_default(); value.deep_merge(primary_value); + errors.extend(primary_errors.into_iter()) } else { while let Some((v, _remaining)) = stream.next().await { // a Err(RecvError) means either that the fetch was not performed and the @@ -449,8 +452,10 @@ impl DeferredNode { .await; if !is_depends_empty { - let primary_value = primary_receiver.recv().await.unwrap_or_default(); + let (primary_value, primary_errors) = + primary_receiver.recv().await.unwrap_or_default(); v.deep_merge(primary_value); + errors.extend(primary_errors.into_iter()) } if let Err(e) = tx @@ -473,8 +478,10 @@ impl DeferredNode { }; tx.disconnect(); } else { - let primary_value = primary_receiver.recv().await.unwrap_or_default(); + let (primary_value, primary_errors) = + primary_receiver.recv().await.unwrap_or_default(); value.deep_merge(primary_value); + errors.extend(primary_errors.into_iter()); if let Err(e) = tx .send( diff --git a/apollo-router/src/services/execution_service.rs b/apollo-router/src/services/execution_service.rs index b9c24e110a..05088b5487 100644 --- a/apollo-router/src/services/execution_service.rs +++ b/apollo-router/src/services/execution_service.rs @@ -24,6 +24,7 @@ use super::subgraph_service::SubgraphServiceFactory; use super::Plugins; use crate::graphql::IncrementalResponse; use crate::graphql::Response; +use crate::json_ext::Object; use crate::json_ext::Path; use crate::json_ext::PathElement; use crate::json_ext::ValueExt; @@ -128,6 +129,10 @@ where response.has_next = Some(has_next); } + response.errors.retain(|error| match &error.path { + None => true, + Some(error_path) => query.contains_error_path(operation_name.as_deref(), response.subselection.as_deref(), response.path.as_ref(), error_path), + }); ready(Some(response)) } // if the deferred response specified a path, we must extract the @@ -157,6 +162,9 @@ where } }); + let query = query.clone(); + let operation_name = operation_name.clone(); + let incremental = sub_responses .into_iter() .filter_map(move |(path, data)| { @@ -166,17 +174,59 @@ where .iter() .filter(|error| match &error.path { None => false, - Some(err_path) => err_path.starts_with(&path), + Some(error_path) =>query.contains_error_path(operation_name.as_deref(), response.subselection.as_deref(), response.path.as_ref(), error_path) && error_path.starts_with(&path), + }) .cloned() .collect::>(); + let extensions: Object = response + .extensions + .iter() + .map(|(key, value)| { + if key.as_str() == "valueCompletion" { + let value = match value.as_array() { + None => Value::Null, + Some(v) => Value::Array( + v.iter() + .filter(|ext| { + match ext + .as_object() + .as_ref() + .and_then(|ext| { + ext.get("path") + }) + .and_then(|v| { + let p:Option = serde_json_bytes::from_value(v.clone()).ok(); + p + }) { + None => false, + Some(ext_path) => { + ext_path + .starts_with( + &path, + ) + } + } + }) + .cloned() + .collect(), + ), + }; + + (key.clone(), value) + } else { + (key.clone(), value.clone()) + } + }) + .collect(); + // an empty response should not be sent // still, if there's an error or extension to show, we should // send it if !data.is_null() || !errors.is_empty() - || !response.extensions.is_empty() + || !extensions.is_empty() { Some( IncrementalResponse::builder() @@ -184,7 +234,7 @@ where .data(data) .path(path) .errors(errors) - .extensions(response.extensions.clone()) + .extensions(extensions) .build(), ) } else { @@ -199,6 +249,7 @@ where .incremental(incremental) .build(), )) + } } }) diff --git a/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__deferred_fragment_bounds_nullability-2.snap b/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__deferred_fragment_bounds_nullability-2.snap index acd2170b0b..524ed427a0 100644 --- a/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__deferred_fragment_bounds_nullability-2.snap +++ b/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__deferred_fragment_bounds_nullability-2.snap @@ -23,24 +23,6 @@ expression: stream.next_response().await.unwrap() "suborga", 0 ] - }, - { - "message": "Cannot return null for non-nullable field Organization.nonNullId", - "path": [ - "currentUser", - "activeOrganization", - "suborga", - 1 - ] - }, - { - "message": "Cannot return null for non-nullable field Organization.nonNullId", - "path": [ - "currentUser", - "activeOrganization", - "suborga", - 2 - ] } ] } @@ -55,15 +37,6 @@ expression: stream.next_response().await.unwrap() ], "extensions": { "valueCompletion": [ - { - "message": "Cannot return null for non-nullable field Organization.nonNullId", - "path": [ - "currentUser", - "activeOrganization", - "suborga", - 0 - ] - }, { "message": "Cannot return null for non-nullable field Organization.nonNullId", "path": [ @@ -72,15 +45,6 @@ expression: stream.next_response().await.unwrap() "suborga", 1 ] - }, - { - "message": "Cannot return null for non-nullable field Organization.nonNullId", - "path": [ - "currentUser", - "activeOrganization", - "suborga", - 2 - ] } ] } @@ -95,24 +59,6 @@ expression: stream.next_response().await.unwrap() ], "extensions": { "valueCompletion": [ - { - "message": "Cannot return null for non-nullable field Organization.nonNullId", - "path": [ - "currentUser", - "activeOrganization", - "suborga", - 0 - ] - }, - { - "message": "Cannot return null for non-nullable field Organization.nonNullId", - "path": [ - "currentUser", - "activeOrganization", - "suborga", - 1 - ] - }, { "message": "Cannot return null for non-nullable field Organization.nonNullId", "path": [ diff --git a/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__errors_from_primary_on_deferred_responses-2.snap b/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__errors_from_primary_on_deferred_responses-2.snap new file mode 100644 index 0000000000..54d405aa47 --- /dev/null +++ b/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__errors_from_primary_on_deferred_responses-2.snap @@ -0,0 +1,32 @@ +--- +source: apollo-router/src/services/supergraph_service.rs +expression: stream.next_response().await.unwrap() +--- +{ + "hasNext": false, + "incremental": [ + { + "data": { + "errorField": null + }, + "path": [ + "computer" + ], + "errors": [ + { + "message": "Error field", + "locations": [ + { + "line": 1, + "column": 93 + } + ], + "path": [ + "computer", + "errorField" + ] + } + ] + } + ] +} diff --git a/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__errors_from_primary_on_deferred_responses.snap b/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__errors_from_primary_on_deferred_responses.snap new file mode 100644 index 0000000000..c444dc838c --- /dev/null +++ b/apollo-router/src/services/snapshots/apollo_router__services__supergraph_service__tests__errors_from_primary_on_deferred_responses.snap @@ -0,0 +1,12 @@ +--- +source: apollo-router/src/services/supergraph_service.rs +expression: stream.next_response().await.unwrap() +--- +{ + "data": { + "computer": { + "id": "Computer1" + } + }, + "hasNext": true +} diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index 07846fd21a..d73512d654 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -615,6 +615,112 @@ mod tests { insta::assert_json_snapshot!(stream.next_response().await.unwrap()); } + #[tokio::test] + async fn errors_from_primary_on_deferred_responses() { + let schema = r#" + schema + @link(url: "https://specs.apollo.dev/link/v1.0") + @link(url: "https://specs.apollo.dev/join/v0.2", for: EXECUTION) + { + query: Query + } + + directive @join__field(graph: join__Graph!, requires: join__FieldSet, provides: join__FieldSet, type: String, external: Boolean, override: String, usedOverridden: Boolean) repeatable on FIELD_DEFINITION | INPUT_FIELD_DEFINITION + directive @join__graph(name: String!, url: String!) on ENUM_VALUE + directive @join__implements(graph: join__Graph!, interface: String!) repeatable on OBJECT | INTERFACE + directive @join__type(graph: join__Graph!, key: join__FieldSet, extension: Boolean! = false, resolvable: Boolean! = true) repeatable on OBJECT | INTERFACE | UNION | ENUM | INPUT_OBJECT | SCALAR + directive @link(url: String, as: String, for: link__Purpose, import: [link__Import]) repeatable on SCHEMA + + scalar link__Import + enum link__Purpose { + SECURITY + EXECUTION + } + + type Computer + @join__type(graph: COMPUTERS) + { + id: ID! + errorField: String + nonNullErrorField: String! + } + + scalar join__FieldSet + + enum join__Graph { + COMPUTERS @join__graph(name: "computers", url: "http://localhost:4001/") + } + + + type Query + @join__type(graph: COMPUTERS) + { + computer(id: ID!): Computer + }"#; + + let subgraphs = MockedSubgraphs([ + ("computers", MockSubgraph::builder().with_json( + serde_json::json!{{"query":"{currentUser{__typename id}}"}}, + serde_json::json!{{"data": {"currentUser": { "__typename": "User", "id": "0" }}}} + ) + .with_json( + serde_json::json!{{ + "query":"{computer(id:\"Computer1\"){id errorField}}", + }}, + serde_json::json!{{ + "data": { + "computer": { + "id": "Computer1" + } + }, + "errors": [ + { + "message": "Error field", + "locations": [ + { + "line": 1, + "column": 93 + } + ], + "path": ["computer","errorField"], + } + ] + }} + ).build()), + ].into_iter().collect()); + + let service = TestHarness::builder() + .configuration_json(serde_json::json!({"include_subgraph_errors": { "all": true } })) + .unwrap() + .schema(schema) + .extra_plugin(subgraphs) + .build() + .await + .unwrap(); + + let request = supergraph::Request::fake_builder() + .header("Accept", "multipart/mixed; deferSpec=20220824") + .query( + r#"query { + computer(id: "Computer1") { + id + ...ComputerErrorField @defer + } + } + fragment ComputerErrorField on Computer { + errorField + }"#, + ) + .build() + .unwrap(); + + let mut stream = service.oneshot(request).await.unwrap(); + + insta::assert_json_snapshot!(stream.next_response().await.unwrap()); + + insta::assert_json_snapshot!(stream.next_response().await.unwrap()); + } + #[tokio::test] async fn deferred_fragment_bounds_nullability() { let subgraphs = MockedSubgraphs([ diff --git a/apollo-router/src/spec/query.rs b/apollo-router/src/spec/query.rs index 73e08e9e6e..f1004a981d 100644 --- a/apollo-router/src/spec/query.rs +++ b/apollo-router/src/spec/query.rs @@ -956,6 +956,39 @@ impl Query { None => self.operations.get(0), } } + + pub(crate) fn contains_error_path( + &self, + operation_name: Option<&str>, + subselection: Option<&str>, + response_path: Option<&Path>, + path: &Path, + ) -> bool { + println!( + "Query::contains_error_path: path = {path}, query: {}", + self.string, + ); + let operation = if let Some(subselection) = subselection { + // Get subselection from hashmap + match self.subselections.get(&SubSelection { + path: response_path.cloned().unwrap_or_default(), + subselection: subselection.to_string(), + }) { + Some(subselection_query) => &subselection_query.operations[0], + None => return false, + } + } else { + match self.operation(operation_name) { + None => return false, + Some(op) => op, + } + }; + + operation + .selection_set + .iter() + .any(|selection| selection.contains_error_path(&path.0, &self.fragments)) + } } /// Intermediate structure for arguments passed through the entire formatting diff --git a/apollo-router/src/spec/selection.rs b/apollo-router/src/spec/selection.rs index 11e67e36db..07282d853d 100644 --- a/apollo-router/src/spec/selection.rs +++ b/apollo-router/src/spec/selection.rs @@ -4,7 +4,9 @@ use serde::Deserialize; use serde::Serialize; use serde_json_bytes::ByteString; +use super::Fragments; use crate::json_ext::Object; +use crate::json_ext::PathElement; use crate::spec::TYPENAME; use crate::FieldType; use crate::Schema; @@ -314,6 +316,81 @@ impl Selection { Ok(selection) } + + pub(crate) fn contains_error_path(&self, path: &[PathElement], fragments: &Fragments) -> bool { + match (path.get(0), self) { + (None, _) => true, + ( + Some(PathElement::Key(key)), + Selection::Field { + name, + alias, + selection_set, + .. + }, + ) => { + if alias.as_ref().unwrap_or(name).as_str() == key.as_str() { + match selection_set { + // if we don't select after that field, the path should stop there + None => path.len() == 1, + Some(set) => set + .iter() + .any(|selection| selection.contains_error_path(&path[1..], fragments)), + } + } else { + false + } + } + ( + Some(PathElement::Fragment(fragment)), + Selection::InlineFragment { + type_condition, + selection_set, + .. + }, + ) => { + if fragment.as_str().strip_prefix("... on ") == Some(type_condition.as_str()) { + selection_set + .iter() + .any(|selection| selection.contains_error_path(&path[1..], fragments)) + } else { + false + } + } + (Some(PathElement::Fragment(fragment)), Self::FragmentSpread { name, .. }) => { + if let Some(f) = fragments.get(name) { + if fragment.as_str().strip_prefix("... on ") == Some(f.type_condition.as_str()) + { + f.selection_set + .iter() + .any(|selection| selection.contains_error_path(&path[1..], fragments)) + } else { + false + } + } else { + false + } + } + (_, Self::FragmentSpread { name, .. }) => { + if let Some(f) = fragments.get(name) { + f.selection_set + .iter() + .any(|selection| selection.contains_error_path(path, fragments)) + } else { + false + } + } + (Some(PathElement::Index(_)), _) | (Some(PathElement::Flatten), _) => { + self.contains_error_path(&path[1..], fragments) + } + (Some(PathElement::Key(_)), Selection::InlineFragment { selection_set, .. }) => { + selection_set + .iter() + .any(|selection| selection.contains_error_path(&path[1..], fragments)) + } + (Some(PathElement::Fragment(_)), Selection::Field { .. }) => false, + } + } } pub(crate) fn parse_skip(directive: &ast::Directive) -> Option { diff --git a/licenses.html b/licenses.html index d748902a41..a9b939e12d 100644 --- a/licenses.html +++ b/licenses.html @@ -44,7 +44,7 @@

Third Party Licenses

Overview of licenses:

    -
  • MIT License (75)
  • +
  • MIT License (77)
  • Apache License 2.0 (53)
  • ISC License (12)
  • BSD 3-Clause "New" or "Revised" License (6)
  • @@ -10011,7 +10011,7 @@

    Used by:

    -
    Copyright (c) <year> <owner> 
    +                
    Copyright (c) <year> <owner> All rights reserved.
     
     Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
     
    @@ -10160,7 +10160,7 @@ 

    Used by:

    -
    Copyright (c) <year> <owner>. 
    +                
    Copyright (c) <year> <owner>. All rights reserved.
     
     Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
     
    @@ -12119,6 +12119,34 @@ 

    Used by:

    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
    + +
  • +

    MIT License

    +

    Used by:

    + +
    MIT License
    +
    +Copyright (c) 2019 Daniel Augusto Rizzi Salvadori
    +
    +Permission is hereby granted, free of charge, to any person obtaining a copy
    +of this software and associated documentation files (the "Software"), to deal
    +in the Software without restriction, including without limitation the rights
    +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    +copies of the Software, and to permit persons to whom the Software is
    +furnished to do so, subject to the following conditions:
    +
    +The above copyright notice and this permission notice shall be included in all
    +copies or substantial portions of the Software.
    +
    +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     SOFTWARE.
  • @@ -12216,6 +12244,36 @@

    Used by:

    The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +
  • + +
  • +

    MIT License

    +

    Used by:

    + +
    The MIT License (MIT)
    +
    +Copyright (c) 2014 Benjamin Sago
    +Copyright (c) 2021-2022 The Nushell Project Developers
    +
    +Permission is hereby granted, free of charge, to any person obtaining a copy
    +of this software and associated documentation files (the "Software"), to deal
    +in the Software without restriction, including without limitation the rights
    +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    +copies of the Software, and to permit persons to whom the Software is
    +furnished to do so, subject to the following conditions:
    +
    +The above copyright notice and this permission notice shall be included in all
    +copies or substantial portions of the Software.
    +
     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE