Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move batch query cancellation up #4859

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions apollo-router/src/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ impl BatchQuery {
// TODO: How should we handle the sender dying?
self.sender
.as_ref()
.unwrap()
.expect("set query hashes has a sender")
.send(BatchHandlerMessage::Begin {
index: self.index,
query_hashes,
})
.await
.unwrap();
.expect("set query hashes could send");
}

/// Signal to the batch handler that this specific batch query has made some progress.
Expand All @@ -94,7 +94,7 @@ impl BatchQuery {
// TODO: How should we handle the sender dying?
self.sender
.as_ref()
.unwrap()
.expect("signal progress has a sender")
.send(BatchHandlerMessage::Progress {
index: self.index,
client_factory,
Expand All @@ -104,7 +104,7 @@ impl BatchQuery {
span_context: Span::current().context(),
})
.await
.unwrap();
.expect("signal progress could send");

self.remaining -= 1;
if self.remaining == 0 {
Expand All @@ -121,18 +121,20 @@ impl BatchQuery {
if self.sender.is_some() {
self.sender
.as_ref()
.unwrap()
.expect("signal cancelled has a sender")
.send(BatchHandlerMessage::Cancel {
index: self.index,
reason,
})
.await
.unwrap();
.expect("signal cancelled could send");

self.remaining -= 1;
if self.remaining == 0 {
self.sender = None;
}
} else {
tracing::warn!("attempted to cancel completed batch query");
}
}
}
Expand Down Expand Up @@ -251,10 +253,12 @@ impl Batch {
{
sender
.send(Err(Box::new(FetchError::SubrequestBatchingError {
service: request.subgraph_name.unwrap(),
service: request
.subgraph_name
.expect("request has a subgraph_name"),
reason: format!("request cancelled: {reason}"),
})))
.unwrap();
.expect("batcher could send request cancelled to waiter");
}

// Clear out everything that has committed, now that they are cancelled, and
Expand Down Expand Up @@ -336,7 +340,12 @@ impl Batch {
} in all_in_one
{
let value = svc_map
.entry(sg_request.subgraph_name.clone().unwrap())
.entry(
sg_request
.subgraph_name
.clone()
.expect("request has a subgraph_name"),
)
.or_default();
value.push(BatchQueryInfo {
request: sg_request,
Expand All @@ -346,11 +355,13 @@ impl Batch {
}

// tracing::debug!("svc_map: {svc_map:?}");
process_batches(master_client_factory.unwrap(), svc_map)
.await
.expect("XXX NEEDS TO WORK FOR NOW");
}
.instrument(tracing::info_span!("batch_request", size)));
// If we don't have a master_client_factory, we can't do anything.
if let Some(client_factory) = master_client_factory {
process_batches(client_factory, svc_map)
.await
.expect("XXX NEEDS TO WORK FOR NOW");
}
});

Self {
senders: Mutex::new(senders),
Expand Down
12 changes: 0 additions & 12 deletions apollo-router/src/plugins/coprocessor/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tower_service::Service;

use super::externalize_header_map;
use super::*;
use crate::batching::BatchQuery;
use crate::graphql;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
Expand Down Expand Up @@ -291,17 +290,6 @@ where
execution_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at execution layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
23 changes: 0 additions & 23 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use tower::Service;
use tower::ServiceBuilder;
use tower::ServiceExt;

use crate::batching::BatchQuery;
use crate::error::Error;
use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer;
use crate::layers::ServiceBuilderExt;
Expand Down Expand Up @@ -686,17 +685,6 @@ where
}
}

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at router layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down Expand Up @@ -1027,17 +1015,6 @@ where
subgraph_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at subgraph layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
11 changes: 0 additions & 11 deletions apollo-router/src/plugins/coprocessor/supergraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,6 @@ where
supergraph_response
};

// Handle cancelled batch queries
// FIXME: This should be way higher up the call chain so that custom plugins / rhai / etc. can
// automatically work with batched queries and cancellations.
let batch_query_opt = res.context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// TODO: How do we reliably get the reason for the coprocessor cancellation here?
batch_query
.signal_cancelled("coprocessor cancelled request at supergraph layer".to_string())
.await;
}

return Ok(ControlFlow::Break(res));
}

Expand Down
13 changes: 1 addition & 12 deletions apollo-router/src/plugins/rhai/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use tower::ServiceExt;

use self::engine::RhaiService;
use self::engine::SharedMut;
use crate::batching::BatchQuery;
use crate::error::Error;
use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
Expand Down Expand Up @@ -339,17 +338,7 @@ macro_rules! gen_map_request {
let mut guard = shared_request.lock().unwrap();
let request_opt = guard.take();

// FIXME: Catch this error higher up the chain
let context = request_opt.unwrap().context;
if let Some(mut batch_query) =
context.extensions().lock().remove::<BatchQuery>()
{
let send_fut =
batch_query.signal_cancelled("cancelled by rhai".to_string());
futures::executor::block_on(send_fut);
}

return $base::request_failure(context, error_details);
return $base::request_failure(request_opt.unwrap().context, error_details);
}
let mut guard = shared_request.lock().unwrap();
let request_opt = guard.take();
Expand Down
30 changes: 29 additions & 1 deletion apollo-router/src/services/router/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use tracing::Instrument;
use super::ClientRequestAccepts;
use crate::axum_factory::CanceledRequest;
use crate::batching::Batch;
use crate::batching::BatchQuery;
use crate::cache::DeduplicatingCache;
use crate::configuration::Batching;
use crate::configuration::BatchingMode;
Expand Down Expand Up @@ -426,12 +427,39 @@ impl RouterService {
}
};

// We need to handle cases where a failure is part of a batch and thus must be cancelled.
// Requests can be cancelled at any point of the router pipeline, but all failures bubble back
// up through here, so we can catch them without having to specially handle batch queries in
// other portions of the codebase.
let futures = supergraph_requests
.into_iter()
.map(|supergraph_request| self.process_supergraph_request(supergraph_request));
.map(|supergraph_request| async {
// TODO: We clone the context here, because if the request results in an Err, the
// response context will no longer exist.
let context = supergraph_request.context.clone();
let result = self.process_supergraph_request(supergraph_request).await;

// Regardless of the result, we need to make sure that we cancel any potential batch queries. This is because
// custom rust plugins, rhai scripts, and coprocessors can cancel requests at any time and return a GraphQL
// error wrapped in an `Ok` or in a `BoxError` wrapped in an `Err`.
let batch_query_opt = context.extensions().lock().remove::<BatchQuery>();
if let Some(mut batch_query) = batch_query_opt {
// Only proceed with signalling cancelled if the batch_query is not finished
if !batch_query.finished() {
tracing::info!("cancelling batch query in supergraph response");
batch_query
.signal_cancelled("request terminated by user".to_string())
.await;
}
}

result
});

// Use join_all to preserve ordering of concurrent operations
// (Short circuit processing and propagate any errors in the batch)
// Note: We use `join_all` here since it awaits all futures before returning, thus allowing us to
// handle cancellation logic without fear of the other futures getting killed.
let mut results: Vec<router::Response> = join_all(futures)
.await
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ coprocessor:
all:
request:
service_name: true
body: true

include_subgraph_errors:
all: true
Loading