Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reuse cached query plans across schema updates if possible #4883

Merged
merged 26 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changesets/feat_geal_schema_hash_requires.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
### reuse cached query plans across schema updates if possible ([Issue #4834](https://github.com/apollographql/router/issues/4834))

This extends the schema aware query hashing introduced in entity caching, to reduce the amount of work when reloading the router. That hash is designed to stay the same for a same query across schema updates if the update does not affect that query. If query planner cache warm up is configured, then it can reuse previous cache entries for which the hash does not change, which will reduce CPU usage and make reloads faster.

This can be activated with the following option:

```yaml title="router.yaml"
supergraph:
query_planning:
warmed_up_queries: 100
experimental_reuse_query_plans: true
```

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/4883
9 changes: 7 additions & 2 deletions apollo-router/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::Mutex;
use tower::BoxError;

use self::storage::CacheStorage;
use self::storage::InMemoryCache;
use self::storage::KeyType;
use self::storage::ValueType;
use crate::configuration::RedisCache;
Expand Down Expand Up @@ -113,6 +114,10 @@ where
self.storage.insert(key, value).await;
}

pub(crate) async fn insert_in_memory(&self, key: K, value: V) {
self.storage.insert_in_memory(key, value).await;
}

async fn send(&self, sender: broadcast::Sender<V>, key: &K, value: V) {
// Lock the wait map to prevent more subscribers racing with our send
// notification
Expand All @@ -121,8 +126,8 @@ where
let _ = sender.send(value);
}

pub(crate) async fn in_memory_keys(&self) -> Vec<K> {
self.storage.in_memory_keys().await
pub(crate) fn in_memory_cache(&self) -> InMemoryCache<K, V> {
self.storage.in_memory_cache()
}
}

Expand Down
22 changes: 15 additions & 7 deletions apollo-router/src/cache/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ where
// It has the functions it needs already
}

pub(crate) type InMemoryCache<K, V> = Arc<Mutex<LruCache<K, V>>>;

// placeholder storage module
//
// this will be replaced by the multi level (in memory + redis/memcached) once we find
Expand Down Expand Up @@ -178,13 +180,19 @@ where
);
}

pub(crate) async fn in_memory_keys(&self) -> Vec<K> {
self.inner
.lock()
.await
.iter()
.map(|(k, _)| k.clone())
.collect()
pub(crate) async fn insert_in_memory(&self, key: K, value: V) {
let mut in_memory = self.inner.lock().await;
in_memory.put(key, value);
let size = in_memory.len() as u64;
tracing::info!(
value.apollo_router_cache_size = size,
kind = %self.caller,
storage = &tracing::field::display(CacheStorageName::Memory),
);
}

pub(crate) fn in_memory_cache(&self) -> InMemoryCache<K, V> {
self.inner.clone()
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions apollo-router/src/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,10 @@ pub(crate) struct QueryPlanning {
///
/// The default value is None, which specifies no limit.
pub(crate) experimental_paths_limit: Option<u32>,

/// If cache warm up is configured, this will allow the router to keep a query plan created with
/// the old schema, if it determines that the schema update does not affect the corresponding query
pub(crate) experimental_reuse_query_plans: bool,
}

/// Cache configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2629,7 +2629,8 @@ expression: "&schema"
},
"warmed_up_queries": null,
"experimental_plans_limit": null,
"experimental_paths_limit": null
"experimental_paths_limit": null,
"experimental_reuse_query_plans": false
},
"early_cancel": false,
"experimental_log_on_broken_pipe": false
Expand Down Expand Up @@ -2692,7 +2693,8 @@ expression: "&schema"
},
"warmed_up_queries": null,
"experimental_plans_limit": null,
"experimental_paths_limit": null
"experimental_paths_limit": null,
"experimental_reuse_query_plans": false
},
"type": "object",
"properties": {
Expand Down Expand Up @@ -2839,6 +2841,11 @@ expression: "&schema"
"minimum": 0.0,
"nullable": true
},
"experimental_reuse_query_plans": {
"description": "If cache warm up is configured, this will allow the router to keep a query plan created with the old schema, if it determines that the schema update does not affect the corresponding query",
"default": false,
"type": "boolean"
},
"warmed_up_queries": {
"description": "Warms up the cache on reloads by running the query plan over a list of the most used queries (from the in memory cache) Configures the number of queries warmed up. Defaults to 1/3 of the in memory cache",
"default": null,
Expand Down
1 change: 1 addition & 0 deletions apollo-router/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ mod test {
crate::services::layers::query_analysis::ParsedDocumentInner {
ast: Default::default(),
executable: Default::default(),
hash: Default::default(),
parse_errors: Default::default(),
validation_errors: Default::default(),
},
Expand Down
42 changes: 20 additions & 22 deletions apollo-router/src/plugins/authorization/authenticated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
use std::collections::HashMap;

use apollo_compiler::ast;
use apollo_compiler::executable;
use apollo_compiler::schema;
use apollo_compiler::schema::Implementers;
use apollo_compiler::schema::Name;
use apollo_compiler::Node;
use tower::BoxError;

use crate::json_ext::Path;
Expand All @@ -21,7 +23,7 @@ pub(crate) const AUTHENTICATED_SPEC_VERSION_RANGE: &str = ">=0.1.0, <=0.1.0";

pub(crate) struct AuthenticatedCheckVisitor<'a> {
schema: &'a schema::Schema,
fragments: HashMap<&'a ast::Name, &'a ast::FragmentDefinition>,
fragments: HashMap<&'a ast::Name, &'a Node<executable::Fragment>>,
pub(crate) found: bool,
authenticated_directive_name: String,
entity_query: bool,
Expand All @@ -30,13 +32,13 @@ pub(crate) struct AuthenticatedCheckVisitor<'a> {
impl<'a> AuthenticatedCheckVisitor<'a> {
pub(crate) fn new(
schema: &'a schema::Schema,
executable: &'a ast::Document,
executable: &'a executable::ExecutableDocument,
entity_query: bool,
) -> Option<Self> {
Some(Self {
schema,
entity_query,
fragments: transform::collect_fragments(executable),
fragments: executable.fragments.iter().collect(),
found: false,
authenticated_directive_name: Schema::directive_name(
schema,
Expand All @@ -60,22 +62,22 @@ impl<'a> AuthenticatedCheckVisitor<'a> {
t.directives().has(&self.authenticated_directive_name)
}

fn entities_operation(&mut self, node: &ast::OperationDefinition) -> Result<(), BoxError> {
fn entities_operation(&mut self, node: &executable::Operation) -> Result<(), BoxError> {
use crate::spec::query::traverse::Visitor;

if node.selection_set.len() != 1 {
if node.selection_set.selections.len() != 1 {
return Err("invalid number of selections for _entities query".into());
}

match node.selection_set.first() {
Some(ast::Selection::Field(field)) => {
match node.selection_set.selections.first() {
Some(executable::Selection::Field(field)) => {
if field.name.as_str() != "_entities" {
return Err("expected _entities field".into());
}

for selection in &field.selection_set {
for selection in &field.selection_set.selections {
match selection {
ast::Selection::InlineFragment(f) => {
executable::Selection::InlineFragment(f) => {
match f.type_condition.as_ref() {
None => {
return Err("expected type condition".into());
Expand All @@ -94,11 +96,7 @@ impl<'a> AuthenticatedCheckVisitor<'a> {
}

impl<'a> traverse::Visitor for AuthenticatedCheckVisitor<'a> {
fn operation(
&mut self,
root_type: &str,
node: &ast::OperationDefinition,
) -> Result<(), BoxError> {
fn operation(&mut self, root_type: &str, node: &executable::Operation) -> Result<(), BoxError> {
if !self.entity_query {
traverse::operation(self, root_type, node)
} else {
Expand All @@ -109,7 +107,7 @@ impl<'a> traverse::Visitor for AuthenticatedCheckVisitor<'a> {
&mut self,
_parent_type: &str,
field_def: &ast::FieldDefinition,
node: &ast::Field,
node: &executable::Field,
) -> Result<(), BoxError> {
if self.is_field_authenticated(field_def) {
self.found = true;
Expand All @@ -118,25 +116,25 @@ impl<'a> traverse::Visitor for AuthenticatedCheckVisitor<'a> {
traverse::field(self, field_def, node)
}

fn fragment_definition(&mut self, node: &ast::FragmentDefinition) -> Result<(), BoxError> {
fn fragment(&mut self, node: &executable::Fragment) -> Result<(), BoxError> {
if self
.schema
.types
.get(&node.type_condition)
.get(node.type_condition())
.is_some_and(|type_definition| self.is_type_authenticated(type_definition))
{
self.found = true;
return Ok(());
}
traverse::fragment_definition(self, node)
traverse::fragment(self, node)
}

fn fragment_spread(&mut self, node: &ast::FragmentSpread) -> Result<(), BoxError> {
let condition = &self
fn fragment_spread(&mut self, node: &executable::FragmentSpread) -> Result<(), BoxError> {
let condition = self
.fragments
.get(&node.fragment_name)
.ok_or("MissingFragment")?
.type_condition;
.type_condition();

if self
.schema
Expand All @@ -153,7 +151,7 @@ impl<'a> traverse::Visitor for AuthenticatedCheckVisitor<'a> {
fn inline_fragment(
&mut self,
parent_type: &str,
node: &ast::InlineFragment,
node: &executable::InlineFragment,
) -> Result<(), BoxError> {
if let Some(name) = &node.type_condition {
if self
Expand Down
32 changes: 20 additions & 12 deletions apollo-router/src/plugins/authorization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashSet;
use std::ops::ControlFlow;

use apollo_compiler::ast;
use apollo_compiler::ast::Document;
use apollo_compiler::ExecutableDocument;
use http::StatusCode;
use schemars::JsonSchema;
use serde::Deserialize;
Expand Down Expand Up @@ -176,18 +176,23 @@ impl AuthorizationPlugin {

pub(crate) fn query_analysis(
query: &str,
operation_name: Option<&str>,
schema: &Schema,
configuration: &Configuration,
context: &Context,
) {
let doc = Query::parse_document(query, schema, configuration);
let ast = &doc.ast;
) -> Result<(), SpecError> {
let doc = Query::parse_document(query, operation_name, schema, configuration)?;

let CacheKeyMetadata {
is_authenticated,
scopes,
policies,
} = Self::generate_cache_metadata(ast, &schema.definitions, false);
} = Self::generate_cache_metadata(
&doc.executable,
operation_name,
&schema.definitions,
false,
);
if is_authenticated {
context.insert(AUTHENTICATED_KEY, true).unwrap();
}
Expand All @@ -201,39 +206,42 @@ impl AuthorizationPlugin {
policies.into_iter().map(|policy| (policy, None)).collect();
context.insert(REQUIRED_POLICIES_KEY, policies).unwrap();
}

Ok(())
}

pub(crate) fn generate_cache_metadata(
ast: &Document,
document: &ExecutableDocument,
operation_name: Option<&str>,
schema: &apollo_compiler::Schema,
entity_query: bool,
) -> CacheKeyMetadata {
let mut is_authenticated = false;
if let Some(mut visitor) = AuthenticatedCheckVisitor::new(schema, ast, entity_query) {
if let Some(mut visitor) = AuthenticatedCheckVisitor::new(schema, document, entity_query) {
// if this fails, the query is invalid and will fail at the query planning phase.
// We do not return validation errors here for now because that would imply a huge
// refactoring of telemetry and tests
if traverse::document(&mut visitor, ast).is_ok() && visitor.found {
if traverse::document(&mut visitor, document, operation_name).is_ok() && visitor.found {
is_authenticated = true;
}
}

let mut scopes = Vec::new();
if let Some(mut visitor) = ScopeExtractionVisitor::new(schema, ast, entity_query) {
if let Some(mut visitor) = ScopeExtractionVisitor::new(schema, document, entity_query) {
// if this fails, the query is invalid and will fail at the query planning phase.
// We do not return validation errors here for now because that would imply a huge
// refactoring of telemetry and tests
if traverse::document(&mut visitor, ast).is_ok() {
if traverse::document(&mut visitor, document, operation_name).is_ok() {
scopes = visitor.extracted_scopes.into_iter().collect();
}
}

let mut policies: Vec<String> = Vec::new();
if let Some(mut visitor) = PolicyExtractionVisitor::new(schema, ast, entity_query) {
if let Some(mut visitor) = PolicyExtractionVisitor::new(schema, document, entity_query) {
// if this fails, the query is invalid and will fail at the query planning phase.
// We do not return validation errors here for now because that would imply a huge
// refactoring of telemetry and tests
if traverse::document(&mut visitor, ast).is_ok() {
if traverse::document(&mut visitor, document, operation_name).is_ok() {
policies = visitor.extracted_policies.into_iter().collect();
}
}
Expand Down
Loading
Loading