Skip to content

Commit

Permalink
Expose the the Studio Operation ID to plugins. (#3586)
Browse files Browse the repository at this point in the history
Fix #2728

This changeset exposes a new key in the context, `studio_operation_id`,
which identifies operation you can find in studio:

```
https://studio.apollographql.com/graph/<your_graph_variant>/variant/<your_graph_variant>/operations?query=<studio_operation_id>
```

This new context key is exposed at various stages of the operation
pipeline:

- Execution service request
- Subgraph service request

- Subgraph service response
- Execution service response
- Supergraph service response
- Router service response
  • Loading branch information
o0Ignition0o authored Aug 21, 2023
2 parents 41cb44e + f790fc0 commit 6f22c88
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 146 deletions.
19 changes: 19 additions & 0 deletions .changesets/feat_igni_stats_report_key_hash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
### Expose the stats_reports_key hash to plugins. ([Issue #2728](https://github.com/apollographql/router/issues/2728))

This changeset exposes a new key in the context, `apollo_operation_id`, which identifies operation you can find in studio:

```
https://studio.apollographql.com/graph/<your_graph_variant>/variant/<your_graph_variant>/operations?query=<apollo_operation_id>
```

This new context key is exposed at various stages of the operation pipeline:

- Execution service request
- Subgraph service request

- Subgraph service response
- Execution service response
- Supergraph service response
- Router service response

By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/3586
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ dependencies = [
"serde_json_bytes",
"serde_urlencoded",
"serde_yaml",
"sha1 0.10.5",
"sha2",
"shellexpand",
"similar-asserts",
Expand Down
1 change: 1 addition & 0 deletions apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ aws-sigv4 = "0.55.3"
aws-credential-types = "0.55.3"
aws-config = "0.55.3"
aws-types = "0.55.3"
sha1 = "0.10.5"

[target.'cfg(macos)'.dependencies]
uname = "0.1.1"
Expand Down
326 changes: 186 additions & 140 deletions apollo-router/src/query_planner/caching_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,171 +191,209 @@ where
}

fn call(&mut self, request: query_planner::CachingRequest) -> Self::Future {
let mut qp = self.clone();
let schema_id = self.schema.schema_id.clone();
let qp = self.clone();
Box::pin(async move {
let caching_key = CachingQueryKey {
schema_id,
query: request.query.clone(),
operation: request.operation_name.to_owned(),
};

let context = request.context.clone();
let entry = qp.cache.get(&caching_key).await;
if entry.is_first() {
let query_planner::CachingRequest {
mut query,
operation_name,
context,
} = request;

let compiler = match context.private_entries.lock().get::<Compiler>() {
None => {
return Err(CacheResolverError::RetrievalError(Arc::new(
QueryPlannerError::SpecError(SpecError::ParsingError(
"missing compiler".to_string(),
)),
)))
}
Some(c) => c.0.clone(),
};
qp.plan(request).await.map(|response| {
if let Some(usage_reporting) =
context.private_entries.lock().get::<UsageReporting>()
{
let _ = response.context.insert(
"apollo_operation_id",
stats_report_key_hash(usage_reporting.stats_report_key.as_str()),
);
}
response
})
})
}
}

let compiler_guard = compiler.lock().await;
let file_id = compiler_guard
.db
.source_file(QUERY_EXECUTABLE.into())
.ok_or(QueryPlannerError::SpecError(SpecError::ParsingError(
"missing input file for query".to_string(),
)))
.map_err(|e| CacheResolverError::RetrievalError(Arc::new(e)))?;
impl<T> CachingQueryPlanner<T>
where
T: tower::Service<
QueryPlannerRequest,
Response = QueryPlannerResponse,
Error = QueryPlannerError,
> + Clone
+ Send
+ 'static,
<T as tower::Service<QueryPlannerRequest>>::Future: Send,
{
async fn plan(
mut self,
request: query_planner::CachingRequest,
) -> Result<<T as tower::Service<QueryPlannerRequest>>::Response, CacheResolverError> {
let schema_id = self.schema.schema_id.clone();

if let Ok(modified_query) = add_defer_labels(file_id, &compiler_guard) {
query = modified_query;
let caching_key = CachingQueryKey {
schema_id,
query: request.query.clone(),
operation: request.operation_name.to_owned(),
};

let context = request.context.clone();
let entry = self.cache.get(&caching_key).await;
if entry.is_first() {
let query_planner::CachingRequest {
mut query,
operation_name,
context,
} = request;

let compiler = match context.private_entries.lock().get::<Compiler>() {
None => {
return Err(CacheResolverError::RetrievalError(Arc::new(
QueryPlannerError::SpecError(SpecError::ParsingError(
"missing compiler".to_string(),
)),
)))
}
drop(compiler_guard);

let request = QueryPlannerRequest::builder()
.query(query)
.and_operation_name(operation_name)
.context(context)
.build();

// some clients might timeout and cancel the request before query planning is finished,
// so we execute it in a task that can continue even after the request was canceled and
// the join handle was dropped. That way, the next similar query will use the cache instead
// of restarting the query planner until another timeout
tokio::task::spawn(
async move {
// we need to isolate the compiler guard here, otherwise rustc might believe we still hold it
// when inserting the error in the entry
let err_res = {
let compiler_guard = compiler.lock().await;
Query::check_errors(&compiler_guard, file_id)
};

if let Err(error) = err_res {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: error.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
let e = Arc::new(QueryPlannerError::SpecError(error));
entry.insert(Err(e.clone())).await;
return Err(CacheResolverError::RetrievalError(e));
}
Some(c) => c.0.clone(),
};

let compiler_guard = compiler.lock().await;
let file_id = compiler_guard
.db
.source_file(QUERY_EXECUTABLE.into())
.ok_or(QueryPlannerError::SpecError(SpecError::ParsingError(
"missing input file for query".to_string(),
)))
.map_err(|e| CacheResolverError::RetrievalError(Arc::new(e)))?;

if let Ok(modified_query) = add_defer_labels(file_id, &compiler_guard) {
query = modified_query;
}
drop(compiler_guard);

let request = QueryPlannerRequest::builder()
.query(query)
.and_operation_name(operation_name)
.context(context)
.build();

// some clients might timeout and cancel the request before query planning is finished,
// so we execute it in a task that can continue even after the request was canceled and
// the join handle was dropped. That way, the next similar query will use the cache instead
// of restarting the query planner until another timeout
tokio::task::spawn(
async move {
// we need to isolate the compiler guard here, otherwise rustc might believe we still hold it
// when inserting the error in the entry
let err_res = {
let compiler_guard = compiler.lock().await;
Query::check_errors(&compiler_guard, file_id)
};

if let Err(error) = err_res {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: error.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
let e = Arc::new(QueryPlannerError::SpecError(error));
entry.insert(Err(e.clone())).await;
return Err(CacheResolverError::RetrievalError(e));
}

let res = qp.delegate.ready().await?.call(request).await;
let res = self.delegate.ready().await?.call(request).await;

match res {
match res {
Ok(QueryPlannerResponse {
content,
context,
errors,
}) => {
if let Some(content) = &content {
entry.insert(Ok(content.clone())).await;
}

if let Some(QueryPlannerContent::Plan { plan, .. }) = &content {
context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
}
Ok(QueryPlannerResponse {
content,
context,
errors,
}) => {
if let Some(content) = &content {
entry.insert(Ok(content.clone())).await;
}

if let Some(QueryPlannerContent::Plan { plan, .. }) = &content {
context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
}
Ok(QueryPlannerResponse {
content,
context,
errors,
})
}
Err(error) => {
let e = Arc::new(error);
entry.insert(Err(e.clone())).await;
Err(CacheResolverError::RetrievalError(e))
}
})
}
Err(error) => {
let e = Arc::new(error);
entry.insert(Err(e.clone())).await;
Err(CacheResolverError::RetrievalError(e))
}
}
.in_current_span(),
)
}
.in_current_span(),
)
.await
.map_err(|e| {
CacheResolverError::RetrievalError(Arc::new(QueryPlannerError::JoinError(
e.to_string(),
)))
})?
} else {
let res = entry
.get()
.await
.map_err(|e| {
CacheResolverError::RetrievalError(Arc::new(QueryPlannerError::JoinError(
e.to_string(),
)))
})?
} else {
let res = entry
.get()
.await
.map_err(|_| QueryPlannerError::UnhandledPlannerResult)?;
.map_err(|_| QueryPlannerError::UnhandledPlannerResult)?;

match res {
Ok(content) => {
if let QueryPlannerContent::Plan { plan, .. } = &content {
context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
}

match res {
Ok(content) => {
if let QueryPlannerContent::Plan { plan, .. } = &content {
context
Ok(QueryPlannerResponse::builder()
.content(content)
.context(context)
.build())
}
Err(error) => {
match error.deref() {
QueryPlannerError::PlanningErrors(pe) => {
request
.context
.private_entries
.lock()
.insert(plan.usage_reporting.clone());
.insert(pe.usage_reporting.clone());
}

Ok(QueryPlannerResponse::builder()
.content(content)
.context(context)
.build())
}
Err(error) => {
match error.deref() {
QueryPlannerError::PlanningErrors(pe) => {
request
.context
.private_entries
.lock()
.insert(pe.usage_reporting.clone());
}
QueryPlannerError::SpecError(e) => {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: e.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
}
_ => {}
QueryPlannerError::SpecError(e) => {
request
.context
.private_entries
.lock()
.insert(UsageReporting {
stats_report_key: e.get_error_key().to_string(),
referenced_fields_by_type: HashMap::new(),
});
}

Err(CacheResolverError::RetrievalError(error))
_ => {}
}

Err(CacheResolverError::RetrievalError(error))
}
}
})
}
}
}

fn stats_report_key_hash(stats_report_key: &str) -> String {
let mut hasher = sha1::Sha1::new();
hasher.update(stats_report_key.as_bytes());
let result = hasher.finalize();
hex::encode(result)
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub(crate) struct CachingQueryKey {
pub(crate) schema_id: Option<String>,
Expand Down Expand Up @@ -565,4 +603,12 @@ mod tests {
.contains_key::<UsageReporting>());
}
}

#[test]
fn apollo_operation_id_hash() {
assert_eq!(
"d1554552698157b05c2a462827fb4367a4548ee5",
stats_report_key_hash("# IgnitionMeQuery\nquery IgnitionMeQuery{me{id}}")
);
}
}
Loading

0 comments on commit 6f22c88

Please sign in to comment.