diff --git a/.changesets/feat_igni_stats_report_key_hash.md b/.changesets/feat_igni_stats_report_key_hash.md new file mode 100644 index 0000000000..1971b41d63 --- /dev/null +++ b/.changesets/feat_igni_stats_report_key_hash.md @@ -0,0 +1,19 @@ +### Expose the stats_reports_key hash to plugins. ([Issue #2728](https://github.com/apollographql/router/issues/2728)) + +This changeset exposes a new key in the context, `apollo_operation_id`, which identifies operation you can find in studio: + +``` +https://studio.apollographql.com/graph//variant//operations?query= +``` + +This new context key is exposed at various stages of the operation pipeline: + +- Execution service request +- Subgraph service request + +- Subgraph service response +- Execution service response +- Supergraph service response +- Router service response + +By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/3586 diff --git a/Cargo.lock b/Cargo.lock index 4f7c2ddde6..59cdde5e83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,6 +372,7 @@ dependencies = [ "serde_json_bytes", "serde_urlencoded", "serde_yaml", + "sha1 0.10.5", "sha2", "shellexpand", "similar-asserts", diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 12521d7453..4437a7e7c3 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -231,6 +231,7 @@ aws-sigv4 = "0.55.3" aws-credential-types = "0.55.3" aws-config = "0.55.3" aws-types = "0.55.3" +sha1 = "0.10.5" [target.'cfg(macos)'.dependencies] uname = "0.1.1" diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 717dbf74f5..04275c77b7 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -191,171 +191,209 @@ where } fn call(&mut self, request: query_planner::CachingRequest) -> Self::Future { - let mut qp = self.clone(); - let schema_id = self.schema.schema_id.clone(); + let qp = self.clone(); Box::pin(async move { - let caching_key = CachingQueryKey { - schema_id, - query: request.query.clone(), - operation: request.operation_name.to_owned(), - }; - let context = request.context.clone(); - let entry = qp.cache.get(&caching_key).await; - if entry.is_first() { - let query_planner::CachingRequest { - mut query, - operation_name, - context, - } = request; - - let compiler = match context.private_entries.lock().get::() { - None => { - return Err(CacheResolverError::RetrievalError(Arc::new( - QueryPlannerError::SpecError(SpecError::ParsingError( - "missing compiler".to_string(), - )), - ))) - } - Some(c) => c.0.clone(), - }; + qp.plan(request).await.map(|response| { + if let Some(usage_reporting) = + context.private_entries.lock().get::() + { + let _ = response.context.insert( + "apollo_operation_id", + stats_report_key_hash(usage_reporting.stats_report_key.as_str()), + ); + } + response + }) + }) + } +} - let compiler_guard = compiler.lock().await; - let file_id = compiler_guard - .db - .source_file(QUERY_EXECUTABLE.into()) - .ok_or(QueryPlannerError::SpecError(SpecError::ParsingError( - "missing input file for query".to_string(), - ))) - .map_err(|e| CacheResolverError::RetrievalError(Arc::new(e)))?; +impl CachingQueryPlanner +where + T: tower::Service< + QueryPlannerRequest, + Response = QueryPlannerResponse, + Error = QueryPlannerError, + > + Clone + + Send + + 'static, + >::Future: Send, +{ + async fn plan( + mut self, + request: query_planner::CachingRequest, + ) -> Result<>::Response, CacheResolverError> { + let schema_id = self.schema.schema_id.clone(); - if let Ok(modified_query) = add_defer_labels(file_id, &compiler_guard) { - query = modified_query; + let caching_key = CachingQueryKey { + schema_id, + query: request.query.clone(), + operation: request.operation_name.to_owned(), + }; + + let context = request.context.clone(); + let entry = self.cache.get(&caching_key).await; + if entry.is_first() { + let query_planner::CachingRequest { + mut query, + operation_name, + context, + } = request; + + let compiler = match context.private_entries.lock().get::() { + None => { + return Err(CacheResolverError::RetrievalError(Arc::new( + QueryPlannerError::SpecError(SpecError::ParsingError( + "missing compiler".to_string(), + )), + ))) } - drop(compiler_guard); - - let request = QueryPlannerRequest::builder() - .query(query) - .and_operation_name(operation_name) - .context(context) - .build(); - - // some clients might timeout and cancel the request before query planning is finished, - // so we execute it in a task that can continue even after the request was canceled and - // the join handle was dropped. That way, the next similar query will use the cache instead - // of restarting the query planner until another timeout - tokio::task::spawn( - async move { - // we need to isolate the compiler guard here, otherwise rustc might believe we still hold it - // when inserting the error in the entry - let err_res = { - let compiler_guard = compiler.lock().await; - Query::check_errors(&compiler_guard, file_id) - }; - - if let Err(error) = err_res { - request - .context - .private_entries - .lock() - .insert(UsageReporting { - stats_report_key: error.get_error_key().to_string(), - referenced_fields_by_type: HashMap::new(), - }); - let e = Arc::new(QueryPlannerError::SpecError(error)); - entry.insert(Err(e.clone())).await; - return Err(CacheResolverError::RetrievalError(e)); - } + Some(c) => c.0.clone(), + }; + + let compiler_guard = compiler.lock().await; + let file_id = compiler_guard + .db + .source_file(QUERY_EXECUTABLE.into()) + .ok_or(QueryPlannerError::SpecError(SpecError::ParsingError( + "missing input file for query".to_string(), + ))) + .map_err(|e| CacheResolverError::RetrievalError(Arc::new(e)))?; + + if let Ok(modified_query) = add_defer_labels(file_id, &compiler_guard) { + query = modified_query; + } + drop(compiler_guard); + + let request = QueryPlannerRequest::builder() + .query(query) + .and_operation_name(operation_name) + .context(context) + .build(); + + // some clients might timeout and cancel the request before query planning is finished, + // so we execute it in a task that can continue even after the request was canceled and + // the join handle was dropped. That way, the next similar query will use the cache instead + // of restarting the query planner until another timeout + tokio::task::spawn( + async move { + // we need to isolate the compiler guard here, otherwise rustc might believe we still hold it + // when inserting the error in the entry + let err_res = { + let compiler_guard = compiler.lock().await; + Query::check_errors(&compiler_guard, file_id) + }; + + if let Err(error) = err_res { + request + .context + .private_entries + .lock() + .insert(UsageReporting { + stats_report_key: error.get_error_key().to_string(), + referenced_fields_by_type: HashMap::new(), + }); + let e = Arc::new(QueryPlannerError::SpecError(error)); + entry.insert(Err(e.clone())).await; + return Err(CacheResolverError::RetrievalError(e)); + } - let res = qp.delegate.ready().await?.call(request).await; + let res = self.delegate.ready().await?.call(request).await; - match res { + match res { + Ok(QueryPlannerResponse { + content, + context, + errors, + }) => { + if let Some(content) = &content { + entry.insert(Ok(content.clone())).await; + } + + if let Some(QueryPlannerContent::Plan { plan, .. }) = &content { + context + .private_entries + .lock() + .insert(plan.usage_reporting.clone()); + } Ok(QueryPlannerResponse { content, context, errors, - }) => { - if let Some(content) = &content { - entry.insert(Ok(content.clone())).await; - } - - if let Some(QueryPlannerContent::Plan { plan, .. }) = &content { - context - .private_entries - .lock() - .insert(plan.usage_reporting.clone()); - } - Ok(QueryPlannerResponse { - content, - context, - errors, - }) - } - Err(error) => { - let e = Arc::new(error); - entry.insert(Err(e.clone())).await; - Err(CacheResolverError::RetrievalError(e)) - } + }) + } + Err(error) => { + let e = Arc::new(error); + entry.insert(Err(e.clone())).await; + Err(CacheResolverError::RetrievalError(e)) } } - .in_current_span(), - ) + } + .in_current_span(), + ) + .await + .map_err(|e| { + CacheResolverError::RetrievalError(Arc::new(QueryPlannerError::JoinError( + e.to_string(), + ))) + })? + } else { + let res = entry + .get() .await - .map_err(|e| { - CacheResolverError::RetrievalError(Arc::new(QueryPlannerError::JoinError( - e.to_string(), - ))) - })? - } else { - let res = entry - .get() - .await - .map_err(|_| QueryPlannerError::UnhandledPlannerResult)?; + .map_err(|_| QueryPlannerError::UnhandledPlannerResult)?; + + match res { + Ok(content) => { + if let QueryPlannerContent::Plan { plan, .. } = &content { + context + .private_entries + .lock() + .insert(plan.usage_reporting.clone()); + } - match res { - Ok(content) => { - if let QueryPlannerContent::Plan { plan, .. } = &content { - context + Ok(QueryPlannerResponse::builder() + .content(content) + .context(context) + .build()) + } + Err(error) => { + match error.deref() { + QueryPlannerError::PlanningErrors(pe) => { + request + .context .private_entries .lock() - .insert(plan.usage_reporting.clone()); + .insert(pe.usage_reporting.clone()); } - - Ok(QueryPlannerResponse::builder() - .content(content) - .context(context) - .build()) - } - Err(error) => { - match error.deref() { - QueryPlannerError::PlanningErrors(pe) => { - request - .context - .private_entries - .lock() - .insert(pe.usage_reporting.clone()); - } - QueryPlannerError::SpecError(e) => { - request - .context - .private_entries - .lock() - .insert(UsageReporting { - stats_report_key: e.get_error_key().to_string(), - referenced_fields_by_type: HashMap::new(), - }); - } - _ => {} + QueryPlannerError::SpecError(e) => { + request + .context + .private_entries + .lock() + .insert(UsageReporting { + stats_report_key: e.get_error_key().to_string(), + referenced_fields_by_type: HashMap::new(), + }); } - - Err(CacheResolverError::RetrievalError(error)) + _ => {} } + + Err(CacheResolverError::RetrievalError(error)) } } - }) + } } } +fn stats_report_key_hash(stats_report_key: &str) -> String { + let mut hasher = sha1::Sha1::new(); + hasher.update(stats_report_key.as_bytes()); + let result = hasher.finalize(); + hex::encode(result) +} + #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub(crate) struct CachingQueryKey { pub(crate) schema_id: Option, @@ -565,4 +603,12 @@ mod tests { .contains_key::()); } } + + #[test] + fn apollo_operation_id_hash() { + assert_eq!( + "d1554552698157b05c2a462827fb4367a4548ee5", + stats_report_key_hash("# IgnitionMeQuery\nquery IgnitionMeQuery{me{id}}") + ); + } } diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index ad29f1e2ae..fb9845ae2c 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -366,12 +366,12 @@ async fn subscription_task( let limit_is_set = subscription_config.max_opened_subscriptions.is_some(); let mut subscription_handle = subscription_handle.clone(); - let operation_signature = - if let Some(usage_reporting) = context.private_entries.lock().get::() { - usage_reporting.stats_report_key.clone() - } else { - String::new() - }; + let operation_signature = context + .private_entries + .lock() + .get::() + .map(|usage_reporting| usage_reporting.stats_report_key.clone()) + .unwrap_or_default(); let operation_name = context .get::<_, String>(OPERATION_NAME) diff --git a/apollo-router/tests/integration_tests.rs b/apollo-router/tests/integration_tests.rs index 45f4abaff8..aa7db73a6f 100644 --- a/apollo-router/tests/integration_tests.rs +++ b/apollo-router/tests/integration_tests.rs @@ -1096,6 +1096,135 @@ async fn include_if_works() { insta::assert_json_snapshot!(stream.next().await.unwrap().unwrap()); } +#[tokio::test(flavor = "multi_thread")] +async fn query_operation_id() { + let config = serde_json::json!({ + "supergraph": { + "introspection": true + }, + }); + + let expected_apollo_operation_id = "d1554552698157b05c2a462827fb4367a4548ee5"; + + let request: router::Request = supergraph::Request::fake_builder() + .query( + r#"query IgnitionMeQuery { + me { + id + } + }"#, + ) + .method(Method::POST) + .build() + .expect("expecting valid request") + .try_into() + .unwrap(); + + let (router, _) = setup_router_and_registry(config).await; + + let response = http_query_with_router(router.clone(), request).await; + assert_eq!( + expected_apollo_operation_id, + response + .context + .get::<_, String>("apollo_operation_id".to_string()) + .unwrap() + .unwrap() + .as_str() + ); + + // let's do it again to make sure a cached query plan still yields a stats report key hash + let request: router::Request = supergraph::Request::fake_builder() + .query( + r#"query IgnitionMeQuery { + me { + id + } + }"#, + ) + .method(Method::POST) + .build() + .expect("expecting valid request") + .try_into() + .unwrap(); + + let response = http_query_with_router(router.clone(), request).await; + assert_eq!( + expected_apollo_operation_id, + response + .context + .get::<_, String>("apollo_operation_id".to_string()) + .unwrap() + .unwrap() + .as_str() + ); + + // let's test failures now + let parse_failure: router::Request = supergraph::Request::fake_builder() + .query(r#"that's not even a query!"#) + .method(Method::POST) + .build() + .expect("expecting valid request") + .try_into() + .unwrap(); + + let response = http_query_with_router(router.clone(), parse_failure).await; + assert!( + // "## GraphQLParseFailure\n" + response + .context + .get::<_, String>("apollo_operation_id".to_string()) + .unwrap() + .is_none() + ); + + let unknown_operation_name: router::Request = supergraph::Request::fake_builder() + .query( + r#"query Me { + me { + id + } + }"#, + ) + .operation_name("NotMe") + .method(Method::POST) + .build() + .expect("expecting valid request") + .try_into() + .unwrap(); + + let response = http_query_with_router(router.clone(), unknown_operation_name).await; + // "## GraphQLUnknownOperationName\n" + assert!(response + .context + .get::<_, String>("apollo_operation_id".to_string()) + .unwrap() + .is_none()); + + let validation_error: router::Request = supergraph::Request::fake_builder() + .query( + r#"query Me { + me { + thisfielddoesntexist + } + }"#, + ) + .operation_name("NotMe") + .method(Method::POST) + .build() + .expect("expecting valid request") + .try_into() + .unwrap(); + + let response = http_query_with_router(router, validation_error).await; + // "## GraphQLValidationFailure\n" + assert!(response + .context + .get::<_, String>("apollo_operation_id".to_string()) + .unwrap() + .is_none()); +} + async fn query_node(request: &supergraph::Request) -> Result { reqwest::Client::new() .post("https://federation-demo-gateway.fly.dev/")