Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batching cancellations to coprocessor #4846

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0f10eb4
docs: Add note that `apollo_router_span` is replaced `apollo_router_p…
abernix Mar 15, 2024
a1ef575
Created scaffolding for new demand control plugin with a simple confi…
tninesling Mar 18, 2024
1b007a5
Fix failing CI due to clippy check (#4822)
BrynCooke Mar 19, 2024
cb57bc1
prep release: v1.43.0-rc.0
abernix Mar 19, 2024
3b371a6
Register demand control plugin, and validate its config format via li…
tninesling Mar 19, 2024
be4911f
Serialize algorithm names in snake_case
tninesling Mar 19, 2024
234bff2
Rename default algorithm to basic, and remove default so demand contr…
tninesling Mar 19, 2024
d1df418
Merge remote-tracking branch 'origin/dev' into tninesling/ROUTER-181
tninesling Mar 19, 2024
412ccf6
Temporarily allow dead code in demand control scaffolding
tninesling Mar 19, 2024
82946f0
Add demand control as optional apollo plugin in router factory
tninesling Mar 19, 2024
6c4087a
Ensure demand control config has proper doc comments
tninesling Mar 19, 2024
2665ece
Create scaffolding for new demand control plugin (#4820)
tninesling Mar 19, 2024
2c506fe
hash all entities to the same slot to support MGET in clusters (#4790)
Geal Mar 20, 2024
3400cf3
feat(subscription): add configurable heartbeat for websocket protocol…
IvanGoncharov Mar 20, 2024
7d939e3
prep release: v1.43.0-rc.1
abernix Mar 20, 2024
ebf30e1
Migrate existing integration tests to a single binary (#4828)
BrynCooke Mar 21, 2024
ded84be
prep release: v1.43.0
abernix Mar 22, 2024
4b84858
release: v1.43.0
abernix Mar 22, 2024
947936a
Reconcile `dev` after merge to `main` for v1.43.0 (#4835)
abernix Mar 22, 2024
38ca966
Enable displaying trace and span id on logs (#4823)
BrynCooke Mar 25, 2024
72cfc47
docs: Update RELEASE_CHECKLIST.md to reflect state of operations (#4841)
abernix Mar 25, 2024
89c034b
add batching cancellations to coprocessors
nicholascioli Mar 25, 2024
f4a53e7
ci: fix CMake install by explicitly using `cmake.install` on windows …
goto-bus-stop Mar 26, 2024
85296cc
Execute the entire request pipeline if the client closed the connecti…
Geal Mar 26, 2024
c1987b6
make clippy happy
garypen Mar 26, 2024
1d2a329
Merge branch 'dev' into nc/subgraph-batching/coprocessor
garypen Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changesets/fix_geal_request_completion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
### Execute the entire request pipeline if the client closed the connection ([Issue #4569](https://github.com/apollographql/router/issues/4569)), [Issue #4576](https://github.com/apollographql/router/issues/4576)), ([Issue #4589](https://github.com/apollographql/router/issues/4589)), ([Issue #4590](https://github.com/apollographql/router/issues/4590)), ([Issue #4611](https://github.com/apollographql/router/issues/4611))

The router is now making sure that the entire request handling pipeline is executed when the client closes the connection early, to let telemetry and any rhai scrit or coprocessor perform their tasks before canceling. Before that, when a client canceled a request, the entire execution was dropped and parts of the router, like telemetry, could not run properly. It now executes up to the first response event (in the case of subscription or `@defer` usage), adds a 499 status code to the response and skips the remaining subgraph requests.

This change will report more requests to Studio and the configured telemetry, which will appear like a sudden increase in errors, because those failing requests were not reported before. To keep the previous behavior of immediately dropping execution for canceled requests, it is possible with the following option:

```yaml
supergraph:
early_cancel: true
```

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/4770
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ commands:
- run:
name: Install CMake
command: |
choco install cmake -y
choco install cmake.install -y
echo 'export PATH="/c/Program Files/CMake/bin:$PATH"' >> "$BASH_ENV"
exit $LASTEXITCODE
- when:
Expand Down
434 changes: 287 additions & 147 deletions RELEASE_CHECKLIST.md

Large diffs are not rendered by default.

110 changes: 104 additions & 6 deletions apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use tower::ServiceBuilder;
use tower::ServiceExt;
use tower_http::decompression::DecompressionBody;
use tower_http::trace::TraceLayer;
use tracing::instrument::WithSubscriber;
use tracing::Instrument;

use super::listeners::ensure_endpoints_consistency;
use super::listeners::ensure_listenaddrs_consistency;
Expand Down Expand Up @@ -64,6 +66,7 @@ use crate::services::router;
use crate::uplink::license_enforcement::LicenseState;
use crate::uplink::license_enforcement::APOLLO_ROUTER_LICENSE_EXPIRED;
use crate::uplink::license_enforcement::LICENSE_EXPIRED_SHORT_MESSAGE;
use crate::Context;

static ACTIVE_SESSION_COUNT: AtomicU64 = AtomicU64::new(0);

Expand Down Expand Up @@ -549,16 +552,17 @@ pub(super) fn main_router<RF>(
where
RF: RouterFactory,
{
let early_cancel = configuration.supergraph.early_cancel;
let mut router = Router::new().route(
&configuration.supergraph.sanitized_path(),
get({
move |Extension(service): Extension<RF>, request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
})
.post({
move |Extension(service): Extension<RF>, request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
}),
);
Expand All @@ -569,13 +573,13 @@ where
get({
move |Extension(service): Extension<RF>,
request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
})
.post({
move |Extension(service): Extension<RF>,
request: Request<DecompressionBody<Body>>| {
handle_graphql(service.create().boxed(), request)
handle_graphql(service.create().boxed(), early_cancel, request)
}
}),
);
Expand All @@ -586,6 +590,7 @@ where

async fn handle_graphql(
service: router::BoxService,
early_cancel: bool,
http_request: Request<DecompressionBody<Body>>,
) -> impl IntoResponse {
let _guard = SessionCountGuard::start();
Expand All @@ -602,7 +607,33 @@ async fn handle_graphql(
.get(ACCEPT_ENCODING)
.cloned();

let res = service.oneshot(request).await;
let res = if early_cancel {
service.oneshot(request).await
} else {
// to make sure we can record request handling when the client closes the connection prematurely,
// we execute the request in a separate task that will run until we get the first response, which
// means it went through the entire pipeline at least once (not looking at deferred responses or
// subscription events). This is a bit wasteful, so to avoid unneeded subgraph calls, we insert in
// the context a flag to indicate that the request is canceled and subgraph calls should not be made
let mut cancel_handler = CancelHandler::new(&context);
let task = service
.oneshot(request)
.with_current_subscriber()
.in_current_span();
let res = match tokio::task::spawn(task).await {
Ok(res) => res,
Err(_e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
"router service call failed",
)
.into_response();
}
};
cancel_handler.on_response();
res
};

let dur = context.busy_time();
let processing_seconds = dur.as_secs_f64();

Expand Down Expand Up @@ -654,12 +685,48 @@ async fn handle_graphql(
}
}

struct CancelHandler<'a> {
context: &'a Context,
got_first_response: bool,
span: tracing::Span,
}

impl<'a> CancelHandler<'a> {
fn new(context: &'a Context) -> Self {
CancelHandler {
context,
got_first_response: false,
span: tracing::Span::current(),
}
}

fn on_response(&mut self) {
self.got_first_response = true;
}
}

impl<'a> Drop for CancelHandler<'a> {
fn drop(&mut self) {
if !self.got_first_response {
self.span
.in_scope(|| tracing::error!("broken pipe: the client closed the connection"));
self.context.extensions().lock().insert(CanceledRequest);
}
}
}

pub(crate) struct CanceledRequest;

#[cfg(test)]
mod tests {
use std::str::FromStr;

use super::*;
use http::header::ACCEPT;
use http::header::CONTENT_TYPE;
use tower::Service;

use super::*;
use crate::assert_snapshot_subscriber;
#[test]
fn test_span_mode_default() {
let config =
Expand Down Expand Up @@ -687,4 +754,35 @@ mod tests {
let mode = span_mode(&config);
assert_eq!(mode, SpanMode::Deprecated);
}

#[tokio::test]
async fn request_cancel() {
let mut http_router = crate::TestHarness::builder()
.schema(include_str!("../testdata/supergraph.graphql"))
.build_http_service()
.await
.unwrap();

async {
let _res = tokio::time::timeout(
std::time::Duration::from_micros(100),
http_router.call(
http::Request::builder()
.method("POST")
.uri("/")
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(hyper::Body::from(r#"{"query":"query { me { name }}"}"#))
.unwrap(),
),
)
.await;

tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
.with_subscriber(assert_snapshot_subscriber!(
tracing_core::LevelFilter::ERROR
))
.await
}
}
1 change: 1 addition & 0 deletions apollo-router/src/axum_factory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::OnceLock;
use axum::Router;
pub(crate) use axum_http_server_factory::span_mode;
pub(crate) use axum_http_server_factory::AxumHttpServerFactory;
pub(crate) use axum_http_server_factory::CanceledRequest;
pub(crate) use listeners::ListenAddrAndRouter;

static ENDPOINT_CALLBACK: OnceLock<Arc<dyn Fn(Router) -> Router + Send + Sync>> = OnceLock::new();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
source: apollo-router/src/axum_factory/axum_http_server_factory.rs
expression: yaml
---
- fields: {}
level: ERROR
message: "broken pipe: the client closed the connection"
10 changes: 10 additions & 0 deletions apollo-router/src/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,12 @@ pub(crate) struct Supergraph {

/// Query planning options
pub(crate) query_planning: QueryPlanning,

/// abort request handling when the client drops the connection.
/// Default: false.
/// When set to true, some parts of the request pipeline like telemetry will not work properly,
/// but request handling will stop immediately when the client connection is closed.
pub(crate) early_cancel: bool,
}

fn default_defer_support() -> bool {
Expand All @@ -632,6 +638,7 @@ impl Supergraph {
defer_support: Option<bool>,
query_planning: Option<QueryPlanning>,
reuse_query_fragments: Option<bool>,
early_cancel: Option<bool>,
) -> Self {
Self {
listen: listen.unwrap_or_else(default_graphql_listen),
Expand All @@ -640,6 +647,7 @@ impl Supergraph {
defer_support: defer_support.unwrap_or_else(default_defer_support),
query_planning: query_planning.unwrap_or_default(),
reuse_query_fragments,
early_cancel: early_cancel.unwrap_or_default(),
}
}
}
Expand All @@ -655,6 +663,7 @@ impl Supergraph {
defer_support: Option<bool>,
query_planning: Option<QueryPlanning>,
reuse_query_fragments: Option<bool>,
early_cancel: Option<bool>,
) -> Self {
Self {
listen: listen.unwrap_or_else(test_listen),
Expand All @@ -663,6 +672,7 @@ impl Supergraph {
defer_support: defer_support.unwrap_or_else(default_defer_support),
query_planning: query_planning.unwrap_or_default(),
reuse_query_fragments,
early_cancel: early_cancel.unwrap_or_default(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2686,7 +2686,8 @@ expression: "&schema"
"warmed_up_queries": null,
"experimental_plans_limit": null,
"experimental_paths_limit": null
}
},
"early_cancel": false
},
"type": "object",
"properties": {
Expand All @@ -2695,6 +2696,11 @@ expression: "&schema"
"default": true,
"type": "boolean"
},
"early_cancel": {
"description": "abort request handling when the client drops the connection. Default: false. When set to true, some parts of the request pipeline like telemetry will not work properly, but request handling will stop immediately when the client connection is closed.",
"default": false,
"type": "boolean"
},
"experimental_reuse_query_fragments": {
"description": "Enable reuse of query fragments Default: depends on the federation version",
"default": null,
Expand Down
13 changes: 13 additions & 0 deletions apollo-router/src/plugins/coprocessor/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tower_service::Service;

use super::externalize_header_map;
use super::*;
use crate::batching::BatchQuery;
use crate::graphql;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
Expand Down Expand Up @@ -289,6 +290,18 @@ where

execution_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at execution layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
24 changes: 24 additions & 0 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tower::Service;
use tower::ServiceBuilder;
use tower::ServiceExt;

use crate::batching::BatchQuery;
use crate::error::Error;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
Expand Down Expand Up @@ -685,6 +686,17 @@ where
}
}

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at router layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down Expand Up @@ -1014,6 +1026,18 @@ where

subgraph_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at subgraph layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
12 changes: 12 additions & 0 deletions apollo-router/src/plugins/coprocessor/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ where

supergraph_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at supergraph layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
Loading