Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the the Studio Operation ID to plugins as apollo_operation_id #3586

Merged
merged 11 commits into from
Aug 21, 2023
19 changes: 19 additions & 0 deletions .changesets/feat_igni_stats_report_key_hash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
### Expose the stats_reports_key hash to plugins. ([Issue #2728](https://github.com/apollographql/router/issues/2728))

This changeset exposes a new key in the context, `apollo_operation_id`, which identifies operation you can find in studio:

```
https://studio.apollographql.com/graph/<your_graph_variant>/variant/<your_graph_variant>/operations?query=<apollo_operation_id>
```

This new context key is exposed at various stages of the operation pipeline:

- Execution service request
- Subgraph service request

- Subgraph service response
- Execution service response
- Supergraph service response
- Router service response

By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/3586
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ dependencies = [
"serde_json_bytes",
"serde_urlencoded",
"serde_yaml",
"sha1 0.10.5",
"sha2",
"shellexpand",
"similar-asserts",
Expand Down
1 change: 1 addition & 0 deletions apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ aws-sigv4 = "0.55.3"
aws-credential-types = "0.55.3"
aws-config = "0.55.3"
aws-types = "0.55.3"
sha1 = "0.10.5"

[target.'cfg(macos)'.dependencies]
uname = "0.1.1"
Expand Down
326 changes: 186 additions & 140 deletions apollo-router/src/query_planner/caching_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,171 +191,209 @@ where
}

fn call(&mut self, request: query_planner::CachingRequest) -> Self::Future {
let mut qp = self.clone();
let schema_id = self.schema.schema_id.clone();
let qp = self.clone();
Box::pin(async move {
let caching_key = CachingQueryKey {
schema_id,
query: request.query.clone(),
operation: request.operation_name.to_owned(),
};

let context = request.context.clone();
let entry = qp.cache.get(&caching_key).await;
if entry.is_first() {
let query_planner::CachingRequest {
mut query,
operation_name,
context,
} = request;

let compiler = match context.private_entries.lock().get::<Compiler>() {
None => {
return Err(CacheResolverError::RetrievalError(Arc::new(
QueryPlannerError::SpecError(SpecError::ParsingError(
"missing compiler".to_string(),
)),
)))
}
Some(c) => c.0.clone(),
};
qp.plan(request).await.map(|response| {
if let Some(usage_reporting) =
context.private_entries.lock().get::<UsageReporting>()
{
let _ = response.context.insert(
"apollo_operation_id",
stats_report_key_hash(usage_reporting.stats_report_key.as_str()),
);
}
response
})
})
}
}

let compiler_guard = compiler.lock().await;
let file_id = compiler_guard
.db
.source_file(QUERY_EXECUTABLE.into())
.ok_or(QueryPlannerError::SpecError(SpecError::ParsingError(
"missing input file for query".to_string(),
)))
.map_err(|e| CacheResolverError::RetrievalError(Arc::new(e)))?;
impl<T> CachingQueryPlanner<T>
where
T: tower::Service<
QueryPlannerRequest,
Response = QueryPlannerResponse,
Error = QueryPlannerError,
> + Clone
+ Send
+ 'static,
<T as tower::Service<QueryPlannerRequest>>::Future: Send,
{
async fn plan(
mut self,
request: query_planner::CachingRequest,
) -> Result<<T as tower::Service<QueryPlannerRequest>>::Response, CacheResolverError> {
let schema_id = self.schema.schema_id.clone();

if let Ok(modified_query) = add_defer_labels(file_id, &compiler_guard) {
query = modified_query;
let caching_key = CachingQueryKey {
schema_id,
query: request.query.clone(),
operation: request.operation_name.to_owned(),
};

let context = request.context.clone();
let entry = self.cache.get(&caching_key).await;
if entry.is_first() {
let query_planner::CachingRequest {
mut query,
operation_name,
context,
} = request;

let compiler = match context.private_entries.lock().get::<Compiler>() {
None => {
return Err(CacheResolverError::RetrievalError(Arc::new(
QueryPlannerError::SpecError(SpecError::ParsingError(
"missing compiler".to_string(),
)),
)))
}
drop(compiler_guard);

let request = QueryPlannerRequest::builder()
.query(query)
.and_operation_name(operation_name)
.context(context)
.build();

// some clients might timeout and cancel the request before query planning is finished,
// so we execute it in a task that can continue even after the request was canceled and
// the join handle was dropped. That way, the next similar query will use the cache instead
// of restarting the query planner until another timeout
tokio::task::spawn(
async move {
// we need to isolate the compiler guard here, otherwise rustc might believe we still hold it
// when inserting the error in the entry
let err_res = {
let compiler_guard = compiler.lock().await;
Query::check_errors(&compiler_guard, file_id)
};

if let Err(error) = err_res {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: error.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
let e = Arc::new(QueryPlannerError::SpecError(error));
entry.insert(Err(e.clone())).await;
return Err(CacheResolverError::RetrievalError(e));
}
Some(c) => c.0.clone(),
};

let compiler_guard = compiler.lock().await;
let file_id = compiler_guard
.db
.source_file(QUERY_EXECUTABLE.into())
.ok_or(QueryPlannerError::SpecError(SpecError::ParsingError(
"missing input file for query".to_string(),
)))
.map_err(|e| CacheResolverError::RetrievalError(Arc::new(e)))?;

if let Ok(modified_query) = add_defer_labels(file_id, &compiler_guard) {
query = modified_query;
}
drop(compiler_guard);

let request = QueryPlannerRequest::builder()
.query(query)
.and_operation_name(operation_name)
.context(context)
.build();

// some clients might timeout and cancel the request before query planning is finished,
// so we execute it in a task that can continue even after the request was canceled and
// the join handle was dropped. That way, the next similar query will use the cache instead
// of restarting the query planner until another timeout
tokio::task::spawn(
async move {
// we need to isolate the compiler guard here, otherwise rustc might believe we still hold it
// when inserting the error in the entry
let err_res = {
let compiler_guard = compiler.lock().await;
Query::check_errors(&compiler_guard, file_id)
};

if let Err(error) = err_res {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: error.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
let e = Arc::new(QueryPlannerError::SpecError(error));
entry.insert(Err(e.clone())).await;
return Err(CacheResolverError::RetrievalError(e));
}

let res = qp.delegate.ready().await?.call(request).await;
let res = self.delegate.ready().await?.call(request).await;

match res {
match res {
Ok(QueryPlannerResponse {
content,
context,
errors,
}) => {
if let Some(content) = &content {
entry.insert(Ok(content.clone())).await;
}

if let Some(QueryPlannerContent::Plan { plan, .. }) = &content {
context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
}
Ok(QueryPlannerResponse {
content,
context,
errors,
}) => {
if let Some(content) = &content {
entry.insert(Ok(content.clone())).await;
}

if let Some(QueryPlannerContent::Plan { plan, .. }) = &content {
context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
}
Ok(QueryPlannerResponse {
content,
context,
errors,
})
}
Err(error) => {
let e = Arc::new(error);
entry.insert(Err(e.clone())).await;
Err(CacheResolverError::RetrievalError(e))
}
})
}
Err(error) => {
let e = Arc::new(error);
entry.insert(Err(e.clone())).await;
Err(CacheResolverError::RetrievalError(e))
}
}
.in_current_span(),
)
}
.in_current_span(),
)
.await
.map_err(|e| {
CacheResolverError::RetrievalError(Arc::new(QueryPlannerError::JoinError(
e.to_string(),
)))
})?
} else {
let res = entry
.get()
.await
.map_err(|e| {
CacheResolverError::RetrievalError(Arc::new(QueryPlannerError::JoinError(
e.to_string(),
)))
})?
} else {
let res = entry
.get()
.await
.map_err(|_| QueryPlannerError::UnhandledPlannerResult)?;
.map_err(|_| QueryPlannerError::UnhandledPlannerResult)?;

match res {
Ok(content) => {
if let QueryPlannerContent::Plan { plan, .. } = &content {
context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
}

match res {
Ok(content) => {
if let QueryPlannerContent::Plan { plan, .. } = &content {
context
Ok(QueryPlannerResponse::builder()
.content(content)
.context(context)
.build())
}
Err(error) => {
match error.deref() {
QueryPlannerError::PlanningErrors(pe) => {
request
.context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
.insert(pe.usage_reporting.clone());
}

Ok(QueryPlannerResponse::builder()
.content(content)
.context(context)
.build())
}
Err(error) => {
match error.deref() {
QueryPlannerError::PlanningErrors(pe) => {
request
.context
.private_entries
.lock()
.insert(pe.usage_reporting.clone());
}
QueryPlannerError::SpecError(e) => {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: e.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
}
_ => {}
QueryPlannerError::SpecError(e) => {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: e.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
}

Err(CacheResolverError::RetrievalError(error))
_ => {}
}

Err(CacheResolverError::RetrievalError(error))
}
}
})
}
}
}

fn stats_report_key_hash(stats_report_key: &str) -> String {
let mut hasher = sha1::Sha1::new();
hasher.update(stats_report_key.as_bytes());
let result = hasher.finalize();
hex::encode(result)
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub(crate) struct CachingQueryKey {
pub(crate) schema_id: Option<String>,
Expand Down Expand Up @@ -565,4 +603,12 @@ mod tests {
.contains_key::<UsageReporting>());
}
}

#[test]
fn apollo_operation_id_hash() {
assert_eq!(
"d1554552698157b05c2a462827fb4367a4548ee5",
stats_report_key_hash("# IgnitionMeQuery\nquery IgnitionMeQuery{me{id}}")
);
}
}
Loading