Skip to content

Commit

Permalink
Move batch query cancellation up (#4859)
Browse files Browse the repository at this point in the history
This commit moves handling of cancelled batch queries into the router
service so that any custom rust plugin, rhai script, coprocessor, etc.
will have cancelled requests handled without needing to handle batches
specially.

<!-- start metadata -->
---

**Checklist**

Complete the checklist (and note appropriate exceptions) before the PR
is marked ready-for-review.

- [ ] Changes are compatible[^1]
- [ ] Documentation[^2] completed
- [ ] Performance impact assessed and acceptable
- Tests added and passing[^3]
    - [ ] Unit Tests
    - [X] Integration Tests
    - [ ] Manual Tests

**Exceptions**

*Note any exceptions here*

**Notes**

[^1]: It may be appropriate to bring upcoming changes to the attention
of other (impacted) groups. Please endeavour to do this before seeking
PR approval. The mechanism for doing this will vary considerably, so use
your judgement as to how and when to do this.
[^2]: Configuration is an important part of many changes. Where
applicable please try to document configuration examples.
[^3]: Tick whichever testing boxes are applicable. If you are adding
Manual Tests, please document the manual testing (extensively) in the
Exceptions.

---------

Co-authored-by: Gary Pennington <[email protected]>
  • Loading branch information
nicholascioli and garypen authored Mar 27, 2024
1 parent 3568a9c commit 617b9b7
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 73 deletions.
39 changes: 25 additions & 14 deletions apollo-router/src/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ impl BatchQuery {
// TODO: How should we handle the sender dying?
self.sender
.as_ref()
.unwrap()
.expect("set query hashes has a sender")
.send(BatchHandlerMessage::Begin {
index: self.index,
query_hashes,
})
.await
.unwrap();
.expect("set query hashes could send");
}

/// Signal to the batch handler that this specific batch query has made some progress.
Expand All @@ -94,7 +94,7 @@ impl BatchQuery {
// TODO: How should we handle the sender dying?
self.sender
.as_ref()
.unwrap()
.expect("signal progress has a sender")
.send(BatchHandlerMessage::Progress {
index: self.index,
client_factory,
Expand All @@ -104,7 +104,7 @@ impl BatchQuery {
span_context: Span::current().context(),
})
.await
.unwrap();
.expect("signal progress could send");

self.remaining -= 1;
if self.remaining == 0 {
Expand All @@ -121,18 +121,20 @@ impl BatchQuery {
if self.sender.is_some() {
self.sender
.as_ref()
.unwrap()
.expect("signal cancelled has a sender")
.send(BatchHandlerMessage::Cancel {
index: self.index,
reason,
})
.await
.unwrap();
.expect("signal cancelled could send");

self.remaining -= 1;
if self.remaining == 0 {
self.sender = None;
}
} else {
tracing::warn!("attempted to cancel completed batch query");
}
}
}
Expand Down Expand Up @@ -251,10 +253,12 @@ impl Batch {
{
sender
.send(Err(Box::new(FetchError::SubrequestBatchingError {
service: request.subgraph_name.unwrap(),
service: request
.subgraph_name
.expect("request has a subgraph_name"),
reason: format!("request cancelled: {reason}"),
})))
.unwrap();
.expect("batcher could send request cancelled to waiter");
}

// Clear out everything that has committed, now that they are cancelled, and
Expand Down Expand Up @@ -336,7 +340,12 @@ impl Batch {
} in all_in_one
{
let value = svc_map
.entry(sg_request.subgraph_name.clone().unwrap())
.entry(
sg_request
.subgraph_name
.clone()
.expect("request has a subgraph_name"),
)
.or_default();
value.push(BatchQueryInfo {
request: sg_request,
Expand All @@ -346,11 +355,13 @@ impl Batch {
}

// tracing::debug!("svc_map: {svc_map:?}");
process_batches(master_client_factory.unwrap(), svc_map)
.await
.expect("XXX NEEDS TO WORK FOR NOW");
}
.instrument(tracing::info_span!("batch_request", size)));
// If we don't have a master_client_factory, we can't do anything.
if let Some(client_factory) = master_client_factory {
process_batches(client_factory, svc_map)
.await
.expect("XXX NEEDS TO WORK FOR NOW");
}
}.instrument(tracing::info_span!("batch_request", size)));

Self {
senders: Mutex::new(senders),
Expand Down
12 changes: 0 additions & 12 deletions apollo-router/src/plugins/coprocessor/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tower_service::Service;

use super::externalize_header_map;
use super::*;
use crate::batching::BatchQuery;
use crate::graphql;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
Expand Down Expand Up @@ -291,17 +290,6 @@ where
execution_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at execution layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
23 changes: 0 additions & 23 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use tower::Service;
use tower::ServiceBuilder;
use tower::ServiceExt;

use crate::batching::BatchQuery;
use crate::error::Error;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
Expand Down Expand Up @@ -686,17 +685,6 @@ where
}
}

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at router layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down Expand Up @@ -1027,17 +1015,6 @@ where
subgraph_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at subgraph layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
11 changes: 0 additions & 11 deletions apollo-router/src/plugins/coprocessor/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,6 @@ where
supergraph_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at supergraph layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
13 changes: 1 addition & 12 deletions apollo-router/src/plugins/rhai/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use tower::ServiceExt;

use self::engine::RhaiService;
use self::engine::SharedMut;
use crate::batching::BatchQuery;
use crate::error::Error;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
Expand Down Expand Up @@ -339,17 +338,7 @@ macro_rules! gen_map_request {
let mut guard = shared_request.lock().unwrap();
let request_opt = guard.take();

// FIXME: Catch this error higher up the chain
let context = request_opt.unwrap().context;
if let Some(mut batch_query) =
context.extensions().lock().remove::<BatchQuery>()
{
let send_fut =
batch_query.signal_cancelled("cancelled by rhai".to_string());
futures::executor::block_on(send_fut);
}

return $base::request_failure(context, error_details);
return $base::request_failure(request_opt.unwrap().context, error_details);
}
let mut guard = shared_request.lock().unwrap();
let request_opt = guard.take();
Expand Down
30 changes: 29 additions & 1 deletion apollo-router/src/services/router/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use tracing::Instrument;
use super::ClientRequestAccepts;
use crate::axum_factory::CanceledRequest;
use crate::batching::Batch;
use crate::batching::BatchQuery;
use crate::cache::DeduplicatingCache;
use crate::configuration::Batching;
use crate::configuration::BatchingMode;
Expand Down Expand Up @@ -426,12 +427,39 @@ impl RouterService {
}
};

// We need to handle cases where a failure is part of a batch and thus must be cancelled.
// Requests can be cancelled at any point of the router pipeline, but all failures bubble back
// up through here, so we can catch them without having to specially handle batch queries in
// other portions of the codebase.
let futures = supergraph_requests
.into_iter()
.map(|supergraph_request| self.process_supergraph_request(supergraph_request));
.map(|supergraph_request| async {
// TODO: We clone the context here, because if the request results in an Err, the
// response context will no longer exist.
let context = supergraph_request.context.clone();
let result = self.process_supergraph_request(supergraph_request).await;

// Regardless of the result, we need to make sure that we cancel any potential batch queries. This is because
// custom rust plugins, rhai scripts, and coprocessors can cancel requests at any time and return a GraphQL
// error wrapped in an `Ok` or in a `BoxError` wrapped in an `Err`.
let batch_query_opt = context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// Only proceed with signalling cancelled if the batch_query is not finished
if !batch_query.finished() {
tracing::info!("cancelling batch query in supergraph response");
batch_query
.signal_cancelled("request terminated by user".to_string())
.await;
}
}

result
});

// Use join_all to preserve ordering of concurrent operations
// (Short circuit processing and propagate any errors in the batch)
// Note: We use `join_all` here since it awaits all futures before returning, thus allowing us to
// handle cancellation logic without fear of the other futures getting killed.
let mut results: Vec<router::Response> = join_all(futures)
.await
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ coprocessor:
all:
request:
service_name: true
body: true

include_subgraph_errors:
all: true
Loading

0 comments on commit 617b9b7

Please sign in to comment.