Skip to content

Commit

Permalink
Execute the entire request pipeline if the client closed the connecti…
Browse files Browse the repository at this point in the history
…on (#4770)

Fix #4569
Fix #4576
Fix #4589
Fix #4590
Fix #4611

When the client closes the connection prematurely, it drops the entire
request handling task, which means that it won't go through the entire
response pipeline, where we record the operation and handle telemetry.
Some users also have additional steps with rhai or coprocessors where
they add metadata, and those steps should run even on canceled requests.
This moves the request handling to a separate task to make sure it runs,
but it also skips subgraph requests if we detected that the client
closed the connection, to prevent unneeded traffic.
  • Loading branch information
Geal authored Mar 26, 2024
1 parent f4a53e7 commit 85296cc
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 18 deletions.
12 changes: 12 additions & 0 deletions .changesets/fix_geal_request_completion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
### Execute the entire request pipeline if the client closed the connection ([Issue #4569](https://github.com/apollographql/router/issues/4569)), [Issue #4576](https://github.com/apollographql/router/issues/4576)), ([Issue #4589](https://github.com/apollographql/router/issues/4589)), ([Issue #4590](https://github.com/apollographql/router/issues/4590)), ([Issue #4611](https://github.com/apollographql/router/issues/4611))

The router is now making sure that the entire request handling pipeline is executed when the client closes the connection early, to let telemetry and any rhai scrit or coprocessor perform their tasks before canceling. Before that, when a client canceled a request, the entire execution was dropped and parts of the router, like telemetry, could not run properly. It now executes up to the first response event (in the case of subscription or `@defer` usage), adds a 499 status code to the response and skips the remaining subgraph requests.

This change will report more requests to Studio and the configured telemetry, which will appear like a sudden increase in errors, because those failing requests were not reported before. To keep the previous behavior of immediately dropping execution for canceled requests, it is possible with the following option:

```yaml
supergraph:
early_cancel: true
```
By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/4770
110 changes: 104 additions & 6 deletions apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use tower::ServiceBuilder;
use tower::ServiceExt;
use tower_http::decompression::DecompressionBody;
use tower_http::trace::TraceLayer;
use tracing::instrument::WithSubscriber;
use tracing::Instrument;

use super::listeners::ensure_endpoints_consistency;
use super::listeners::ensure_listenaddrs_consistency;
Expand Down Expand Up @@ -64,6 +66,7 @@ use crate::services::router;
use crate::uplink::license_enforcement::LicenseState;
use crate::uplink::license_enforcement::APOLLO_ROUTER_LICENSE_EXPIRED;
use crate::uplink::license_enforcement::LICENSE_EXPIRED_SHORT_MESSAGE;
use crate::Context;

static ACTIVE_SESSION_COUNT: AtomicU64 = AtomicU64::new(0);

Expand Down Expand Up @@ -549,16 +552,17 @@ pub(super) fn main_router<RF>(
where
RF: RouterFactory,
{
let early_cancel = configuration.supergraph.early_cancel;
let mut router = Router::new().route(
&configuration.supergraph.sanitized_path(),
get({
move |Extension(service): Extension<RF>, request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
})
.post({
move |Extension(service): Extension<RF>, request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
}),
);
Expand All @@ -569,13 +573,13 @@ where
get({
move |Extension(service): Extension<RF>,
request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
})
.post({
move |Extension(service): Extension<RF>,
request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
}),
);
Expand All @@ -586,6 +590,7 @@ where

async fn handle_graphql(
service: router::BoxService,
early_cancel: bool,
http_request: Request<DecompressionBody<Body>>,
) -> impl IntoResponse {
let _guard = SessionCountGuard::start();
Expand All @@ -602,7 +607,33 @@ async fn handle_graphql(
.get(ACCEPT_ENCODING)
.cloned();

let res = service.oneshot(request).await;
let res = if early_cancel {
service.oneshot(request).await
} else {
// to make sure we can record request handling when the client closes the connection prematurely,
// we execute the request in a separate task that will run until we get the first response, which
// means it went through the entire pipeline at least once (not looking at deferred responses or
// subscription events). This is a bit wasteful, so to avoid unneeded subgraph calls, we insert in
// the context a flag to indicate that the request is canceled and subgraph calls should not be made
let mut cancel_handler = CancelHandler::new(&context);
let task = service
.oneshot(request)
.with_current_subscriber()
.in_current_span();
let res = match tokio::task::spawn(task).await {
Ok(res) => res,
Err(_e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
"router service call failed",
)
.into_response();
}
};
cancel_handler.on_response();
res
};

let dur = context.busy_time();
let processing_seconds = dur.as_secs_f64();

Expand Down Expand Up @@ -654,12 +685,48 @@ async fn handle_graphql(
}
}

struct CancelHandler<'a> {
context: &'a Context,
got_first_response: bool,
span: tracing::Span,
}

impl<'a> CancelHandler<'a> {
fn new(context: &'a Context) -> Self {
CancelHandler {
context,
got_first_response: false,
span: tracing::Span::current(),
}
}

fn on_response(&mut self) {
self.got_first_response = true;
}
}

impl<'a> Drop for CancelHandler<'a> {
fn drop(&mut self) {
if !self.got_first_response {
self.span
.in_scope(|| tracing::error!("broken pipe: the client closed the connection"));
self.context.extensions().lock().insert(CanceledRequest);
}
}
}

pub(crate) struct CanceledRequest;

#[cfg(test)]
mod tests {
use std::str::FromStr;

use super::*;
use http::header::ACCEPT;
use http::header::CONTENT_TYPE;
use tower::Service;

use super::*;
use crate::assert_snapshot_subscriber;
#[test]
fn test_span_mode_default() {
let config =
Expand Down Expand Up @@ -687,4 +754,35 @@ mod tests {
let mode = span_mode(&config);
assert_eq!(mode, SpanMode::Deprecated);
}

#[tokio::test]
async fn request_cancel() {
let mut http_router = crate::TestHarness::builder()
.schema(include_str!("../testdata/supergraph.graphql"))
.build_http_service()
.await
.unwrap();

async {
let _res = tokio::time::timeout(
std::time::Duration::from_micros(100),
http_router.call(
http::Request::builder()
.method("POST")
.uri("/")
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(hyper::Body::from(r#"{"query":"query { me { name }}"}"#))
.unwrap(),
),
)
.await;

tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
.with_subscriber(assert_snapshot_subscriber!(
tracing_core::LevelFilter::ERROR
))
.await
}
}
1 change: 1 addition & 0 deletions apollo-router/src/axum_factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::OnceLock;
use axum::Router;
pub(crate) use axum_http_server_factory::span_mode;
pub(crate) use axum_http_server_factory::AxumHttpServerFactory;
pub(crate) use axum_http_server_factory::CanceledRequest;
pub(crate) use listeners::ListenAddrAndRouter;

static ENDPOINT_CALLBACK: OnceLock<Arc<dyn Fn(Router) -> Router + Send + Sync>> = OnceLock::new();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
source: apollo-router/src/axum_factory/axum_http_server_factory.rs
expression: yaml
---
- fields: {}
level: ERROR
message: "broken pipe: the client closed the connection"
10 changes: 10 additions & 0 deletions apollo-router/src/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,12 @@ pub(crate) struct Supergraph {

/// Query planning options
pub(crate) query_planning: QueryPlanning,

/// abort request handling when the client drops the connection.
/// Default: false.
/// When set to true, some parts of the request pipeline like telemetry will not work properly,
/// but request handling will stop immediately when the client connection is closed.
pub(crate) early_cancel: bool,
}

fn default_defer_support() -> bool {
Expand All @@ -631,6 +637,7 @@ impl Supergraph {
defer_support: Option<bool>,
query_planning: Option<QueryPlanning>,
reuse_query_fragments: Option<bool>,
early_cancel: Option<bool>,
) -> Self {
Self {
listen: listen.unwrap_or_else(default_graphql_listen),
Expand All @@ -639,6 +646,7 @@ impl Supergraph {
defer_support: defer_support.unwrap_or_else(default_defer_support),
query_planning: query_planning.unwrap_or_default(),
reuse_query_fragments,
early_cancel: early_cancel.unwrap_or_default(),
}
}
}
Expand All @@ -654,6 +662,7 @@ impl Supergraph {
defer_support: Option<bool>,
query_planning: Option<QueryPlanning>,
reuse_query_fragments: Option<bool>,
early_cancel: Option<bool>,
) -> Self {
Self {
listen: listen.unwrap_or_else(test_listen),
Expand All @@ -662,6 +671,7 @@ impl Supergraph {
defer_support: defer_support.unwrap_or_else(default_defer_support),
query_planning: query_planning.unwrap_or_default(),
reuse_query_fragments,
early_cancel: early_cancel.unwrap_or_default(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2630,7 +2630,8 @@ expression: "&schema"
"warmed_up_queries": null,
"experimental_plans_limit": null,
"experimental_paths_limit": null
}
},
"early_cancel": false
},
"type": "object",
"properties": {
Expand All @@ -2639,6 +2640,11 @@ expression: "&schema"
"default": true,
"type": "boolean"
},
"early_cancel": {
"description": "abort request handling when the client drops the connection. Default: false. When set to true, some parts of the request pipeline like telemetry will not work properly, but request handling will stop immediately when the client connection is closed.",
"default": false,
"type": "boolean"
},
"experimental_reuse_query_fragments": {
"description": "Enable reuse of query fragments Default: depends on the federation version",
"default": null,
Expand Down
38 changes: 27 additions & 11 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ use super::subscription::SubscriptionHandle;
use super::DeferredNode;
use super::PlanNode;
use super::QueryPlan;
use crate::axum_factory::CanceledRequest;
use crate::error::Error;
use crate::graphql::Request;
use crate::graphql::Response;
use crate::json_ext::Object;
use crate::json_ext::Path;
use crate::json_ext::Value;
use crate::json_ext::ValueExt;
Expand Down Expand Up @@ -218,17 +220,31 @@ impl PlanNode {
PlanNode::Fetch(fetch_node) => {
let fetch_time_offset =
parameters.context.created_at.elapsed().as_nanos() as i64;
let (v, e) = fetch_node
.fetch_node(parameters, parent_value, current_dir)
.instrument(tracing::info_span!(
FETCH_SPAN_NAME,
"otel.kind" = "INTERNAL",
"apollo.subgraph.name" = fetch_node.service_name.as_str(),
"apollo_private.sent_time_offset" = fetch_time_offset
))
.await;
value = v;
errors = e;

// The client closed the connection, we are still executing the request pipeline,
// but we won't send unused trafic to subgraph
if parameters
.context
.extensions()
.lock()
.get::<CanceledRequest>()
.is_some()
{
value = Value::Object(Object::default());
errors = Vec::new();
} else {
let (v, e) = fetch_node
.fetch_node(parameters, parent_value, current_dir)
.instrument(tracing::info_span!(
FETCH_SPAN_NAME,
"otel.kind" = "INTERNAL",
"apollo.subgraph.name" = fetch_node.service_name.as_str(),
"apollo_private.sent_time_offset" = fetch_time_offset
))
.await;
value = v;
errors = e;
}
}
PlanNode::Defer {
primary:
Expand Down
11 changes: 11 additions & 0 deletions apollo-router/src/services/router/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tower_service::Service;
use tracing::Instrument;

use super::ClientRequestAccepts;
use crate::axum_factory::CanceledRequest;
use crate::cache::DeduplicatingCache;
use crate::configuration::Batching;
use crate::configuration::BatchingMode;
Expand Down Expand Up @@ -265,6 +266,16 @@ impl RouterService {
let (mut parts, mut body) = response.into_parts();
process_vary_header(&mut parts.headers);

if context
.extensions()
.lock()
.get::<CanceledRequest>()
.is_some()
{
parts.status = StatusCode::from_u16(499)
.expect("499 is not a standard status code but common enough");
}

match body.next().await {
None => {
tracing::error!("router service is not available to process request",);
Expand Down
5 changes: 5 additions & 0 deletions docs/source/errors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ A request's HTTP `Accept` header didn't contain any of the router's supported mi

Request traffic exceeded configured rate limits. See [client side traffic shaping](./configuration/traffic-shaping/#client-side-traffic-shaping).

</Property>
<Property name="499" short="Request canceled by client">

The request was canceled because the client closed the connection, possibly due to a client side timeout.

</Property>
<Property name="500" short="Internal server error">

Expand Down

0 comments on commit 85296cc

Please sign in to comment.