Skip to content

Commit

Permalink
Metrics and root span creation now happen first
Browse files Browse the repository at this point in the history
Our metrics are currently mutated at the supergraph service. This is a throwback from when the router service did not exist.
Move operation count metrics to the first thing that happens in the pipeline.
Note that this won't catch disconnects because they are reliant on the status code of the response, which will stop processing before the metric is incremented.

Fixes #3915
  • Loading branch information
bryn committed Sep 27, 2023
1 parent bca9d86 commit 055faac
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 25 deletions.
12 changes: 12 additions & 0 deletions .changesets/fix_bryn_move_telemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
### Ensure that telemetry happens first ([Issue #3915](https://github.com/apollographql/router/issues/3915))

Telemetry related logic is now moved to the first thing in the router pipeline.

Previously the metric `apollo.router.operations` may have missed some requests if they were failed at the router stage.
In addition, some logic happened before root spans were created, which would have caused missing traces.

`apollo.router.operations` and root spans are now the first thing that happens in the router pipeline for graphql requests.



By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/3919
16 changes: 14 additions & 2 deletions apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,10 @@ where
(license, Instant::now(), Arc::new(AtomicU64::new(0))),
license_handler,
))
.layer(TraceLayer::new_for_http().make_span_with(PropagatingMakeSpan { license }))
.layer(Extension(service_factory))
.layer(cors);
.layer(cors)
.layer(TraceLayer::new_for_http().make_span_with(PropagatingMakeSpan { license }))
.layer(middleware::from_fn(metrics_handler));

let route = endpoints_on_main_listener
.into_iter()
Expand All @@ -414,6 +415,17 @@ where
Ok(ListenAddrAndRouter(listener, route))
}

async fn metrics_handler<B>(request: Request<B>, next: Next<B>) -> Response {
let resp = next.run(request).await;
u64_counter!(
"apollo.router.operations",
"The number of graphql operations performed by the Router",
1,
"http.response.status_code" = resp.status().as_u16() as i64
);
resp
}

async fn license_handler<B>(
State((license, start, delta)): State<(LicenseState, Instant, Arc<AtomicU64>)>,
request: Request<B>,
Expand Down
12 changes: 0 additions & 12 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,12 +833,6 @@ impl Telemetry {
if !parts.status.is_success() {
metric_attrs.push(KeyValue::new("error", parts.status.to_string()));
}
u64_counter!(
"apollo.router.operations",
"The number of graphql operations performed by the Router",
1,
"http.response.status_code" = parts.status.as_u16() as i64
);
let response = http::Response::from_parts(
parts,
once(ready(first_response.unwrap_or_default()))
Expand All @@ -850,12 +844,6 @@ impl Telemetry {
}
Err(err) => {
metric_attrs.push(KeyValue::new("status", "500"));
u64_counter!(
"apollo.router.operations",
"The number of graphql operations performed by the Router",
1,
"http.response.status_code" = 500
);
Err(err)
}
};
Expand Down
17 changes: 11 additions & 6 deletions apollo-router/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,29 +334,34 @@ impl IntegrationTest {
pub fn execute_default_query(
&self,
) -> impl std::future::Future<Output = (String, reqwest::Response)> {
self.execute_query_internal(None)
self.execute_query_internal(&json!({"query":"query {topProducts{name}}","variables":{}}))
}

#[allow(dead_code)]
pub fn execute_query(
&self,
query: &Value,
) -> impl std::future::Future<Output = (String, reqwest::Response)> {
self.execute_query_internal(Some(query))
self.execute_query_internal(query)
}

#[allow(dead_code)]
pub fn execute_bad_query(
&self,
) -> impl std::future::Future<Output = (String, reqwest::Response)> {
self.execute_query_internal(&json!({"garbage":{}}))
}

fn execute_query_internal(
&self,
query: Option<&Value>,
query: &Value,
) -> impl std::future::Future<Output = (String, reqwest::Response)> {
assert!(
self.router.is_some(),
"router was not started, call `router.start().await; router.assert_started().await`"
);
let default_query = &json!({"query":"query {topProducts{name}}","variables":{}});
let query = query.unwrap_or(default_query).clone();
let dispatch = self.subscriber.clone();

let query = query.clone();
async move {
let span = info_span!("client_request");
let span_id = span.context().span().span_context().trace_id().to_string();
Expand Down
20 changes: 15 additions & 5 deletions apollo-router/tests/metrics_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const PROMETHEUS_CONFIG: &str = include_str!("fixtures/prometheus.router.yaml");
const SUBGRAPH_AUTH_CONFIG: &str = include_str!("fixtures/subgraph_auth.router.yaml");

#[tokio::test(flavor = "multi_thread")]
async fn test_metrics_reloading() -> Result<(), BoxError> {
async fn test_metrics_reloading() {
let mut router = IntegrationTest::builder()
.config(PROMETHEUS_CONFIG)
.build()
Expand Down Expand Up @@ -68,12 +68,10 @@ async fn test_metrics_reloading() -> Result<(), BoxError> {
router.assert_metrics_contains(r#"apollo_router_uplink_fetch_duration_seconds_count{kind="unchanged",query="License",url="https://uplink.api.apollographql.com/",otel_scope_name="apollo/router"}"#, Some(Duration::from_secs(120))).await;
router.assert_metrics_contains(r#"apollo_router_uplink_fetch_count_total{query="License",status="success",otel_scope_name="apollo/router"}"#, Some(Duration::from_secs(1))).await;
}

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_auth_metrics() -> Result<(), BoxError> {
async fn test_subgraph_auth_metrics() {
let mut router = IntegrationTest::builder()
.config(SUBGRAPH_AUTH_CONFIG)
.build()
Expand Down Expand Up @@ -108,6 +106,18 @@ async fn test_subgraph_auth_metrics() -> Result<(), BoxError> {
);

router.assert_metrics_contains(r#"apollo_router_operations_authentication_aws_sigv4_total{authentication_aws_sigv4_failed="false",subgraph_service_name="products",otel_scope_name="apollo/router"} 2"#, None).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_metrics_bad_query() {
let mut router = IntegrationTest::builder()
.config(SUBGRAPH_AUTH_CONFIG)
.build()
.await;

Ok(())
router.start().await;
router.assert_started().await;
// This query won't make it to the supergraph service
router.execute_bad_query().await;
router.assert_metrics_contains(r#"apollo_router_operations_total{http_response_status_code="400",otel_scope_name="apollo/router"} 1"#, None).await;
}

0 comments on commit 055faac

Please sign in to comment.