Skip to content

Commit

Permalink
remove clones from the header plugin (#3721)
Browse files Browse the repository at this point in the history
Fix #3068 

the operations were cloned for every subgraph query, this is a bit
wasteful so we create them only once, and store them under an Arc
  • Loading branch information
Geal authored and garypen committed Sep 12, 2023
1 parent 2c3a22c commit 35be1cc
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 55 deletions.
5 changes: 5 additions & 0 deletions .changesets/fix_geal_remove_clones_from_headers_plugin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### remove clones from the header plugin ([Issue #3068](https://github.com/apollographql/router/issues/3068))

The list of header operations was cloned for every subgraph query, and this was increasing latency. We made sure the overhead is minimal by removing those allocations

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3721
132 changes: 77 additions & 55 deletions apollo-router/src/plugins/headers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

Expand Down Expand Up @@ -187,44 +188,57 @@ struct Config {
}

struct Headers {
config: Config,
all_operations: Arc<Vec<Operation>>,
subgraph_operations: HashMap<String, Arc<Vec<Operation>>>,
}

#[async_trait::async_trait]
impl Plugin for Headers {
type Config = Config;

async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
Ok(Headers {
config: init.config,
})
}
fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
let mut operations: Vec<Operation> = self
let operations: Vec<Operation> = init
.config
.all
.as_ref()
.map(|a| a.request.clone())
.unwrap_or_default();
if let Some(mut subgraph_operations) =
self.config.subgraphs.get(name).map(|s| s.request.clone())
{
operations.append(&mut subgraph_operations);
}
let subgraph_operations = init
.config
.subgraphs
.iter()
.map(|(subgraph_name, op)| {
let mut operations = operations.clone();
operations.append(&mut op.request.clone());
(subgraph_name.clone(), Arc::new(operations))
})
.collect();

Ok(Headers {
all_operations: Arc::new(operations),
subgraph_operations,
})
}

fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
ServiceBuilder::new()
.layer(HeadersLayer::new(operations))
.layer(HeadersLayer::new(
self.subgraph_operations
.get(name)
.cloned()
.unwrap_or_else(|| self.all_operations.clone()),
))
.service(service)
.boxed()
}
}

struct HeadersLayer {
operations: Vec<Operation>,
operations: Arc<Vec<Operation>>,
}

impl HeadersLayer {
fn new(operations: Vec<Operation>) -> Self {
fn new(operations: Arc<Vec<Operation>>) -> Self {
Self { operations }
}
}
Expand All @@ -241,7 +255,7 @@ impl<S> Layer<S> for HeadersLayer {
}
struct HeadersService<S> {
inner: S,
operations: Vec<Operation>,
operations: Arc<Vec<Operation>>,
}

lazy_static! {
Expand Down Expand Up @@ -279,7 +293,7 @@ where
}

fn call(&mut self, mut req: SubgraphRequest) -> Self::Future {
for operation in &self.operations {
for operation in &*self.operations {
match operation {
Operation::Insert(insert_config) => match insert_config {
Insert::Static(static_insert) => {
Expand Down Expand Up @@ -523,12 +537,13 @@ mod test {
})
.returning(example_response);

let mut service =
HeadersLayer::new(vec![Operation::Insert(Insert::Static(InsertStatic {
let mut service = HeadersLayer::new(Arc::new(vec![Operation::Insert(Insert::Static(
InsertStatic {
name: "c".try_into()?,
value: "d".try_into()?,
}))])
.layer(mock);
},
))]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Ok(())
Expand All @@ -549,12 +564,12 @@ mod test {
})
.returning(example_response);

let mut service = HeadersLayer::new(vec![Operation::Insert(Insert::FromContext(
InsertFromContext {
let mut service = HeadersLayer::new(Arc::new(vec![Operation::Insert(
Insert::FromContext(InsertFromContext {
name: "header_from_context".try_into()?,
from_context: "my_key".to_string(),
},
))])
}),
)]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Expand All @@ -576,13 +591,14 @@ mod test {
})
.returning(example_response);

let mut service =
HeadersLayer::new(vec![Operation::Insert(Insert::FromBody(InsertFromBody {
let mut service = HeadersLayer::new(Arc::new(vec![Operation::Insert(Insert::FromBody(
InsertFromBody {
name: "header_from_request".try_into()?,
path: JSONQuery::parse(".operationName")?,
default: None,
}))])
.layer(mock);
},
))]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Ok(())
Expand All @@ -596,8 +612,10 @@ mod test {
.withf(|request| request.assert_headers(vec![("ac", "vac"), ("ab", "vab")]))
.returning(example_response);

let mut service =
HeadersLayer::new(vec![Operation::Remove(Remove::Named("aa".try_into()?))]).layer(mock);
let mut service = HeadersLayer::new(Arc::new(vec![Operation::Remove(Remove::Named(
"aa".try_into()?,
))]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Ok(())
Expand All @@ -611,9 +629,9 @@ mod test {
.withf(|request| request.assert_headers(vec![("ac", "vac")]))
.returning(example_response);

let mut service = HeadersLayer::new(vec![Operation::Remove(Remove::Matching(
let mut service = HeadersLayer::new(Arc::new(vec![Operation::Remove(Remove::Matching(
Regex::from_str("a[ab]")?,
))])
))]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Expand All @@ -636,10 +654,11 @@ mod test {
})
.returning(example_response);

let mut service = HeadersLayer::new(vec![Operation::Propagate(Propagate::Matching {
matching: Regex::from_str("d[ab]")?,
})])
.layer(mock);
let mut service =
HeadersLayer::new(Arc::new(vec![Operation::Propagate(Propagate::Matching {
matching: Regex::from_str("d[ab]")?,
})]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Ok(())
Expand All @@ -660,12 +679,13 @@ mod test {
})
.returning(example_response);

let mut service = HeadersLayer::new(vec![Operation::Propagate(Propagate::Named {
named: "da".try_into()?,
rename: None,
default: None,
})])
.layer(mock);
let mut service =
HeadersLayer::new(Arc::new(vec![Operation::Propagate(Propagate::Named {
named: "da".try_into()?,
rename: None,
default: None,
})]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Ok(())
Expand All @@ -686,12 +706,13 @@ mod test {
})
.returning(example_response);

let mut service = HeadersLayer::new(vec![Operation::Propagate(Propagate::Named {
named: "da".try_into()?,
rename: Some("ea".try_into()?),
default: None,
})])
.layer(mock);
let mut service =
HeadersLayer::new(Arc::new(vec![Operation::Propagate(Propagate::Named {
named: "da".try_into()?,
rename: Some("ea".try_into()?),
default: None,
})]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Ok(())
Expand All @@ -712,12 +733,13 @@ mod test {
})
.returning(example_response);

let mut service = HeadersLayer::new(vec![Operation::Propagate(Propagate::Named {
named: "ea".try_into()?,
rename: None,
default: Some("defaulted".try_into()?),
})])
.layer(mock);
let mut service =
HeadersLayer::new(Arc::new(vec![Operation::Propagate(Propagate::Named {
named: "ea".try_into()?,
rename: None,
default: Some("defaulted".try_into()?),
})]))
.layer(mock);

service.ready().await?.call(example_request()).await?;
Ok(())
Expand Down

0 comments on commit 35be1cc

Please sign in to comment.