From 196902a9a9e82bf910088db504da879a4f6473fe Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 6 Sep 2023 14:20:57 +0200 Subject: [PATCH] Add experimental caching metrics (#3558) Fix #3554 This creates a new metric recorded only if we set the configuration option `telemetry.metrics.common.experimental_cache_metrics.enabled` to `true`. * `apollo.router.operations.entity` (histogram): cache hit ratio per subgraph and entity type This simulates an entity cache to find out if it would be useful. Each time we do a subgraph query, we use as cache key: - subgraph name - entity type - query - vary headers - entity key We record if we have seen this entity before (using a bloom filter) and calculate the cache hit ratio for that query, per subgraph and entity type. --------- Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Co-authored-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- .changesets/maint_bnjjj_caching_metrics.md | 38 +++ Cargo.lock | 18 ++ apollo-router/Cargo.toml | 3 +- ...nfiguration__tests__schema_generation.snap | 17 ++ apollo-router/src/plugins/telemetry/config.rs | 23 ++ apollo-router/src/plugins/telemetry/mod.rs | 227 +++++++++++++++++- .../telemetry/tracing/apollo_telemetry.rs | 2 +- .../src/plugins/traffic_shaping/cache.rs | 91 +++++-- .../src/plugins/traffic_shaping/mod.rs | 5 +- 9 files changed, 393 insertions(+), 31 deletions(-) create mode 100644 .changesets/maint_bnjjj_caching_metrics.md diff --git a/.changesets/maint_bnjjj_caching_metrics.md b/.changesets/maint_bnjjj_caching_metrics.md new file mode 100644 index 0000000000..cd09700cfd --- /dev/null +++ b/.changesets/maint_bnjjj_caching_metrics.md @@ -0,0 +1,38 @@ +### Add experimental caching metrics ([PR #3532](https://github.com/apollographql/router/pull/3532)) + +It adds a metric only if you configure `telemetry.metrics.common.experimental_cache_metrics.enabled` to `true`. It will generate metrics to evaluate which entities would benefit from caching. It simulates a cache with a TTL, configurable at `telemetry.metrics.common.experimental_cache_metrics.ttl` (default: 5 seconds), and measures the cache hit rate per entity type and subgraph. + +example + +``` +# HELP apollo.router.operations.entity.cache_hit +# TYPE apollo_router_operations_entity.cache_hit histogram +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="0.05"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="0.1"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="0.25"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="0.5"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="1"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="2.5"} 3 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="5"} 4 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="10"} 4 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="20"} 4 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="1000"} 4 +apollo_router_operations_entity_cache_hitbucket{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version="",le="+Inf"} 4 +apollo_router_operations_entity_cache_hitsum{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version=""} 7 +apollo_router_operations_entity_cache_hitcount{entity_type="Product",service_name="apollo-router",subgraph="products",otel_scope_name="apollo/router",otel_scope_version=""} 4 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="0.05"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="0.1"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="0.25"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="0.5"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="1"} 0 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="2.5"} 1 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="5"} 1 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="10"} 1 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="20"} 1 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="1000"} 1 +apollo_router_operations_entity_cache_hitbucket{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version="",le="+Inf"} 1 +apollo_router_operations_entity_cache_hitsum{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version=""} 1 +apollo_router_operations_entity_cache_hitcount{entity_type="User",service_name="apollo-router",subgraph="users",otel_scope_name="apollo/router",otel_scope_version=""} 1 +``` + +By [@bnjjj](https://github.com/bnjjj) [@Geal](https://github.com/geal) in https://github.com/apollographql/router/pull/3532 \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 44bc38cd31..212d0f17d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,6 +281,7 @@ dependencies = [ "aws-types", "axum", "base64 0.21.2", + "bloomfilter", "brotli", "buildstructor 0.5.3", "bytes", @@ -1098,6 +1099,17 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "bloomfilter" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b92db7965d438b8b4b1c1d0aedd188440a1084593c9eb7f6657e3df7e906d934" +dependencies = [ + "bit-vec", + "getrandom 0.2.10", + "siphasher", +] + [[package]] name = "brotli" version = "3.3.4" @@ -5793,6 +5805,12 @@ dependencies = [ "time", ] +[[package]] +name = "siphasher" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe" + [[package]] name = "slab" version = "0.4.8" diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 4f91e8a2dc..b9abf26a6b 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -67,6 +67,7 @@ async-trait = "0.1.73" atty = "0.2.14" axum = { version = "0.6.20", features = ["headers", "json", "original-uri"] } base64 = "0.21.2" +bloomfilter = "1.0.12" buildstructor = "0.5.3" bytes = "1.4.0" clap = { version = "4.4.2", default-features = false, features = [ @@ -163,6 +164,7 @@ prost = "0.11.9" prost-types = "0.11.9" proteus = "0.5.0" rand = "0.8.5" +rand_core = "0.6.4" rhai = { version = "1.15.1", features = ["sync", "serde", "internals"] } regex = "1.9.5" reqwest = { version = "0.11.19", default-features = false, features = [ @@ -236,7 +238,6 @@ memchr = "2.6.3" brotli = "3.3.4" zstd = "0.12.4" zstd-safe = "6.0.6" -rand_core = "0.6.4" # note: AWS dependencies should always use the same version aws-sigv4 = "0.56.0" aws-credential-types = "0.56.0" diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index e54c9226cd..ff9123ab31 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -4314,6 +4314,23 @@ expression: "&schema" "format": "double" } }, + "experimental_cache_metrics": { + "description": "Experimental metrics to know more about caching strategies", + "type": "object", + "properties": { + "enabled": { + "description": "Enable experimental metrics", + "default": false, + "type": "boolean" + }, + "ttl": { + "description": "Potential TTL for a cache if we had one (default: 5secs)", + "default": "5s", + "type": "string" + } + }, + "additionalProperties": false + }, "resources": { "description": "Resources", "default": {}, diff --git a/apollo-router/src/plugins/telemetry/config.rs b/apollo-router/src/plugins/telemetry/config.rs index 11e76f1bcf..c0cff8118f 100644 --- a/apollo-router/src/plugins/telemetry/config.rs +++ b/apollo-router/src/plugins/telemetry/config.rs @@ -94,6 +94,28 @@ pub(crate) struct MetricsCommon { /// Custom buckets for histograms #[serde(default = "default_buckets")] pub(crate) buckets: Vec, + /// Experimental metrics to know more about caching strategies + pub(crate) experimental_cache_metrics: ExperimentalCacheMetricsConf, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields, rename_all = "snake_case", default)] +pub(crate) struct ExperimentalCacheMetricsConf { + /// Enable experimental metrics + pub(crate) enabled: bool, + #[serde(with = "humantime_serde")] + #[schemars(with = "String")] + /// Potential TTL for a cache if we had one (default: 5secs) + pub(crate) ttl: Duration, +} + +impl Default for ExperimentalCacheMetricsConf { + fn default() -> Self { + Self { + enabled: false, + ttl: Duration::from_secs(5), + } + } } fn default_buckets() -> Vec { @@ -110,6 +132,7 @@ impl Default for MetricsCommon { service_namespace: None, resources: HashMap::new(), buckets: default_buckets(), + experimental_cache_metrics: ExperimentalCacheMetricsConf::default(), } } } diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 0f46df3750..67450787e9 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -11,6 +11,7 @@ use ::tracing::field; use ::tracing::info_span; use ::tracing::Span; use axum::headers::HeaderName; +use bloomfilter::Bloom; use dashmap::DashMap; use futures::future::ready; use futures::future::BoxFuture; @@ -38,6 +39,7 @@ use opentelemetry::trace::TraceState; use opentelemetry::trace::TracerProvider; use opentelemetry::Context as OtelContext; use opentelemetry::KeyValue; +use parking_lot::Mutex; use rand::Rng; use router_bridge::planner::UsageReporting; use serde_json_bytes::json; @@ -72,6 +74,9 @@ use self::reload::NullFieldFormatter; use self::reload::SamplingFilter; use self::reload::OPENTELEMETRY_TRACER_HANDLE; use self::tracing::apollo_telemetry::APOLLO_PRIVATE_DURATION_NS; +use super::traffic_shaping::cache::hash_request; +use super::traffic_shaping::cache::hash_vary_headers; +use super::traffic_shaping::cache::REPRESENTATIONS; use crate::axum_factory::utils::REQUEST_SPAN_NAME; use crate::context::OPERATION_NAME; use crate::layers::ServiceBuilderExt; @@ -117,6 +122,7 @@ use crate::services::SubgraphRequest; use crate::services::SubgraphResponse; use crate::services::SupergraphRequest; use crate::services::SupergraphResponse; +use crate::spec::TYPENAME; use crate::tracer::TraceId; use crate::Context; use crate::ListenAddr; @@ -162,6 +168,7 @@ pub(crate) struct Telemetry { tracer_provider: Option, meter_provider: AggregateMeterProvider, + counter: Option>>, } #[derive(Debug)] @@ -244,7 +251,21 @@ impl Plugin for Telemetry { config.calculate_field_level_instrumentation_ratio()?; let mut metrics_builder = Self::create_metrics_builder(&config)?; let meter_provider = metrics_builder.meter_provider(); + let counter = config + .metrics + .as_ref() + .and_then(|m| m.common.as_ref()) + .and_then(|c| { + if c.experimental_cache_metrics.enabled { + Some(Arc::new(Mutex::new(CacheCounter::new( + c.experimental_cache_metrics.ttl, + )))) + } else { + None + } + }); let (sampling_filter_ratio, tracer_provider) = Self::create_tracer_provider(&config)?; + Ok(Telemetry { custom_endpoints: metrics_builder.custom_endpoints(), metrics_exporters: metrics_builder.exporters(), @@ -255,6 +276,7 @@ impl Plugin for Telemetry { meter_provider, sampling_filter_ratio, config: Arc::new(config), + counter, }) } @@ -477,7 +499,10 @@ impl Plugin for Telemetry { let subgraph_metrics_conf_req = self.create_subgraph_metrics_conf(name); let subgraph_metrics_conf_resp = subgraph_metrics_conf_req.clone(); let subgraph_name = ByteString::from(name); + let cache_metrics_enabled = self.counter.is_some(); + let counter = self.counter.clone(); let name = name.to_owned(); + let subgraph_name_arc = Arc::new(name.to_owned()); ServiceBuilder::new() .instrument(move |req: &SubgraphRequest| { let query = req @@ -502,7 +527,16 @@ impl Plugin for Telemetry { "apollo_private.ftv1" = field::Empty ) }) - .map_request(request_ftv1) + .map_request(move |mut req: SubgraphRequest| { + let cache_attributes = cache_metrics_enabled + .then(|| Self::get_cache_attributes(subgraph_name_arc.clone(), &mut req)) + .flatten(); + if let Some(cache_attributes) = cache_attributes { + req.context.private_entries.lock().insert(cache_attributes); + } + + request_ftv1(req) + }) .map_response(move |resp| store_ftv1(&subgraph_name, resp)) .map_future_with_request_data( move |sub_request: &SubgraphRequest| { @@ -510,13 +544,16 @@ impl Plugin for Telemetry { subgraph_metrics_conf_req.clone(), sub_request, ); - sub_request.context.clone() + let cache_attributes = sub_request.context.private_entries.lock().remove(); + + (sub_request.context.clone(), cache_attributes) }, - move |context: Context, + move |(context, cache_attributes): (Context, Option), f: BoxFuture<'static, Result>| { let metrics = metrics.clone(); let subgraph_attribute = subgraph_attribute.clone(); let subgraph_metrics_conf = subgraph_metrics_conf_resp.clone(); + let counter = counter.clone(); // Using Instant because it is guaranteed to be monotonically increasing. let now = Instant::now(); f.map(move |result: Result| { @@ -526,6 +563,8 @@ impl Plugin for Telemetry { subgraph_attribute, subgraph_metrics_conf, now, + counter, + cache_attributes, &result, ); result @@ -1032,6 +1071,63 @@ impl Telemetry { ) } + fn get_cache_attributes( + subgraph_name: Arc, + sub_request: &mut Request, + ) -> Option { + let body = dbg!(sub_request.subgraph_request.body_mut()); + let hashed_query = hash_request(body); + let representations = body + .variables + .get(REPRESENTATIONS) + .and_then(|value| value.as_array())?; + + let keys = extract_cache_attributes(representations).ok()?; + + Some(CacheAttributes { + subgraph_name, + headers: sub_request.subgraph_request.headers().clone(), + hashed_query: Arc::new(hashed_query), + representations: keys, + }) + } + + fn update_cache_metrics( + counter: Arc>, + sub_response: &SubgraphResponse, + cache_attributes: CacheAttributes, + ) { + let mut vary_headers = sub_response + .response + .headers() + .get_all(header::VARY) + .into_iter() + .filter_map(|val| { + val.to_str().ok().map(|v| { + v.to_string() + .split(", ") + .map(|s| s.to_string()) + .collect::>() + }) + }) + .flatten() + .collect::>(); + vary_headers.sort(); + let vary_headers = vary_headers.join(", "); + + let hashed_headers = if vary_headers.is_empty() { + Arc::default() + } else { + Arc::new(hash_vary_headers(&cache_attributes.headers)) + }; + counter.lock().record( + cache_attributes.hashed_query.clone(), + cache_attributes.subgraph_name.clone(), + hashed_headers, + cache_attributes.representations, + ); + } + fn store_subgraph_request_attributes( attribute_forward_config: Arc>, sub_request: &Request, @@ -1052,12 +1148,15 @@ impl Telemetry { .insert(SubgraphMetricsAttributes(attributes)); //.unwrap(); } + #[allow(clippy::too_many_arguments)] fn store_subgraph_response_attributes( context: &Context, metrics: BasicMetrics, subgraph_attribute: KeyValue, attribute_forward_config: Arc>, now: Instant, + counter: Option>>, + cache_attributes: Option, result: &Result, ) { let mut metric_attrs = { @@ -1088,6 +1187,21 @@ impl Telemetry { match &result { Ok(response) => { + if let Some(cache_attributes) = cache_attributes { + if let Ok(cache_control) = response + .response + .headers() + .get(header::CACHE_CONTROL) + .ok_or(()) + .and_then(|val| val.to_str().map(|v| v.to_string()).map_err(|_| ())) + { + metric_attrs.push(KeyValue::new("cache_control", cache_control)); + } + + if let Some(counter) = counter { + Self::update_cache_metrics(counter, response, cache_attributes) + } + } metric_attrs.push(KeyValue::new( "status", response.response.status().as_u16().to_string(), @@ -1554,6 +1668,113 @@ impl Telemetry { } } +#[derive(Debug, Clone)] +struct CacheAttributes { + subgraph_name: Arc, + headers: http::HeaderMap, + hashed_query: Arc, + // Typename + hashed_representation + representations: Vec<(Arc, Value)>, +} + +#[derive(Debug, Hash, Clone)] +struct CacheKey { + representation: Value, + typename: Arc, + query: Arc, + subgraph_name: Arc, + hashed_headers: Arc, +} + +// Get typename and hashed representation for each representations in the subgraph query +fn extract_cache_attributes( + representations: &[Value], +) -> Result, Value)>, BoxError> { + let mut res = Vec::new(); + for representation in representations { + let opt_type = representation + .as_object() + .and_then(|o| o.get(TYPENAME)) + .ok_or("missing __typename in representation")?; + let typename = opt_type.as_str().unwrap_or(""); + + res.push((Arc::new(typename.to_string()), representation.clone())); + } + Ok(res) +} + +struct CacheCounter { + primary: Bloom, + secondary: Bloom, + created_at: Instant, + ttl: Duration, +} + +impl CacheCounter { + fn new(ttl: Duration) -> Self { + Self { + primary: Self::make_filter(), + secondary: Self::make_filter(), + created_at: Instant::now(), + ttl, + } + } + + fn make_filter() -> Bloom { + // the filter is around 4kB in size (can be calculated with `Bloom::compute_bitmap_size`) + Bloom::new_for_fp_rate(10000, 0.2) + } + + fn record( + &mut self, + query: Arc, + subgraph_name: Arc, + hashed_headers: Arc, + representations: Vec<(Arc, Value)>, + ) { + if self.created_at.elapsed() >= self.ttl { + self.clear(); + } + + // typename -> (nb of cache hits, nb of entities) + let mut seen: HashMap, (usize, usize)> = HashMap::new(); + for (typename, representation) in representations { + let cache_hit = self.check(&CacheKey { + representation, + typename: typename.clone(), + query: query.clone(), + subgraph_name: subgraph_name.clone(), + hashed_headers: hashed_headers.clone(), + }); + + let seen_entry = seen.entry(typename.clone()).or_default(); + if cache_hit { + seen_entry.0 += 1; + } + seen_entry.1 += 1; + } + + for (typename, (cache_hit, total_entities)) in seen.into_iter() { + ::tracing::info!( + histogram.apollo.router.operations.entity.cache_hit = (cache_hit as f64 / total_entities as f64) * 100f64, + entity_type = %typename, + subgraph = %subgraph_name, + ); + } + } + + fn check(&mut self, key: &CacheKey) -> bool { + self.primary.check_and_set(key) || self.secondary.check(key) + } + + fn clear(&mut self) { + let secondary = std::mem::replace(&mut self.primary, Self::make_filter()); + self.secondary = secondary; + + self.created_at = Instant::now(); + } +} + fn filter_headers(headers: &HeaderMap, forward_rules: &ForwardHeaders) -> String { let headers_map = headers .iter() diff --git a/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs b/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs index 9489189891..25f260977c 100644 --- a/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs +++ b/apollo-router/src/plugins/telemetry/tracing/apollo_telemetry.rs @@ -847,7 +847,7 @@ mod test { use opentelemetry::Value; use prost::Message; use serde_json::json; - use crate::plugins::telemetry::apollo::{ErrorConfiguration}; + use crate::plugins::telemetry::apollo::ErrorConfiguration; use crate::plugins::telemetry::apollo_exporter::proto::reports::Trace; use crate::plugins::telemetry::apollo_exporter::proto::reports::trace::query_plan_node::{DeferNodePrimary, DeferredNode, ResponsePathElement}; use crate::plugins::telemetry::apollo_exporter::proto::reports::trace::{QueryPlanNode, Node, Error}; diff --git a/apollo-router/src/plugins/traffic_shaping/cache.rs b/apollo-router/src/plugins/traffic_shaping/cache.rs index f52ac67061..abb1e7031c 100644 --- a/apollo-router/src/plugins/traffic_shaping/cache.rs +++ b/apollo-router/src/plugins/traffic_shaping/cache.rs @@ -5,8 +5,10 @@ use std::time::Duration; use futures::future::BoxFuture; use futures::FutureExt; +use http::header; use serde::Deserialize; use serde::Serialize; +use serde_json_bytes::ByteString; use serde_json_bytes::Value; use sha2::Digest; use sha2::Sha256; @@ -25,6 +27,9 @@ use crate::json_ext::Object; use crate::services::subgraph; use crate::spec::TYPENAME; +const ENTITIES: &str = "_entities"; +pub(crate) const REPRESENTATIONS: &str = "representations"; + #[derive(Clone)] pub(crate) struct SubgraphCacheLayer { storage: RedisCacheStorage, @@ -83,14 +88,14 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, mut request: subgraph::Request) -> Self::Future { + fn call(&mut self, request: subgraph::Request) -> Self::Future { let service = self.service.clone(); if !request .subgraph_request - .body_mut() + .body() .variables - .contains_key("representations") + .contains_key(REPRESENTATIONS) { return service.oneshot(request).boxed(); } @@ -118,9 +123,11 @@ where let body = request.subgraph_request.body_mut(); let query_hash = hash_request(body); + // TODO: compute TTL with cacheControl directive on the subgraph + let representations = body .variables - .get_mut("representations") + .get_mut(REPRESENTATIONS) .and_then(|value| value.as_array_mut()) .expect("we already checked that representations exist"); @@ -132,11 +139,11 @@ where .unwrap_or_else(|| std::iter::repeat(None).take(keys.len()).collect()); let (new_representations, mut result) = - filter_representations(representations, keys, cache_result)?; + filter_representations(&name, representations, keys, cache_result)?; if !new_representations.is_empty() { body.variables - .insert("representations", new_representations.into()); + .insert(REPRESENTATIONS, new_representations.into()); let mut response = service.oneshot(request).await?; @@ -145,7 +152,7 @@ where if let Some(mut entities) = data .as_mut() .and_then(|v| v.as_object_mut()) - .and_then(|o| o.remove("_entities")) + .and_then(|o| o.remove(ENTITIES)) { let new_entities = insert_entities_in_result( entities @@ -160,7 +167,7 @@ where data.as_mut() .and_then(|v| v.as_object_mut()) - .map(|o| o.insert("_entities", new_entities.into())); + .map(|o| o.insert(ENTITIES, new_entities.into())); response.response.body_mut().data = data; } @@ -168,7 +175,7 @@ where } else { let entities = insert_entities_in_result(&mut Vec::new(), &cache, &mut result).await?; let mut data = Object::default(); - data.insert("_entities", entities.into()); + data.insert(ENTITIES, entities.into()); Ok(subgraph::Response::builder() .data(data) @@ -178,14 +185,42 @@ where } } -fn hash_request(body: &graphql::Request) -> String { +pub(crate) fn hash_vary_headers(headers: &http::HeaderMap) -> String { + let mut digest = Sha256::new(); + + for vary_header_value in headers.get_all(header::VARY).into_iter() { + if vary_header_value == "*" { + return String::from("*"); + } else { + let header_names = match vary_header_value.to_str() { + Ok(header_val) => header_val.split(", "), + Err(_) => continue, + }; + header_names.for_each(|header_name| { + if let Some(header_value) = headers.get(header_name).and_then(|h| h.to_str().ok()) { + digest.update(header_value); + digest.update(&[0u8; 1][..]); + } + }); + } + } + + hex::encode(digest.finalize().as_slice()) +} + +pub(crate) fn hash_request(body: &mut graphql::Request) -> String { let mut digest = Sha256::new(); digest.update(body.query.as_deref().unwrap_or("-").as_bytes()); digest.update(&[0u8; 1][..]); digest.update(body.operation_name.as_deref().unwrap_or("-").as_bytes()); digest.update(&[0u8; 1][..]); + let repr_key = ByteString::from(REPRESENTATIONS); + // Removing the representations variable because it's already part of the cache key + let representations = body.variables.remove(&repr_key); digest.update(&serde_json::to_vec(&body.variables).unwrap()); - + if let Some(representations) = representations { + body.variables.insert(repr_key, representations); + } hex::encode(digest.finalize().as_slice()) } @@ -204,19 +239,21 @@ fn extract_cache_keys( reason: "missing __typename in representation".to_string(), })?; - let typename = opt_type.as_str().unwrap_or("-").to_string(); + let typename = opt_type.as_str().unwrap_or("-"); + + // We have to have representation because it can contains PII + let mut digest = Sha256::new(); + digest.update(serde_json::to_string(&representation).unwrap().as_bytes()); + let hashed_repr = hex::encode(digest.finalize().as_slice()); let key = format!( "subgraph.{}|{}|{}|{}", - subgraph_name, - &typename, - serde_json::to_string(&representation).unwrap(), - query_hash + subgraph_name, &typename, hashed_repr, query_hash ); representation .as_object_mut() - .map(|o| o.insert("__typename", opt_type)); + .map(|o| o.insert(TYPENAME, opt_type)); res.push(key); } Ok(res) @@ -230,6 +267,7 @@ struct IntermediateResult { // build a new list of representations without the ones we got from the cache fn filter_representations( + subgraph_name: &str, representations: &mut Vec, keys: Vec, mut cache_result: Vec>, @@ -245,7 +283,7 @@ fn filter_representations( { let opt_type = representation .as_object_mut() - .and_then(|o| o.remove("__typename")) + .and_then(|o| o.remove(TYPENAME)) .ok_or_else(|| FetchError::MalformedRequest { reason: "missing __typename in representation".to_string(), })?; @@ -257,7 +295,7 @@ fn filter_representations( representation .as_object_mut() - .map(|o| o.insert("__typename", opt_type)); + .map(|o| o.insert(TYPENAME, opt_type)); new_representations.push(representation); } else { cache_hit.entry(typename.clone()).or_default().0 += 1; @@ -270,11 +308,17 @@ fn filter_representations( } for (ty, (hit, miss)) in cache_hit { - tracing::event!( - Level::INFO, + tracing::info!( + monotonic_counter.apollo.router.operations.entity.cache = hit as u64, + entity_type = ty.as_str(), + hit = %true, + %subgraph_name + ); + tracing::info!( + monotonic_counter.apollo.router.operations.entity.cache = miss as u64, entity_type = ty.as_str(), - cache_hit = hit, - cache_miss = miss + miss = %true, + %subgraph_name ); } @@ -317,6 +361,7 @@ async fn insert_entities_in_result( } if !to_insert.is_empty() { + // TODO use insert_multiple_with_ttl cache.insert_multiple(&to_insert).await; } diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index 68b0a0d286..a85bdce659 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -6,7 +6,7 @@ //! * Compression //! * Rate limiting //! -mod cache; +pub(crate) mod cache; mod deduplication; pub(crate) mod rate; mod retry; @@ -379,10 +379,9 @@ impl TrafficShaping { let all_config = self.config.all.as_ref(); let subgraph_config = self.config.subgraphs.get(name); let final_config = Self::merge_config(all_config, subgraph_config); - let entity_caching = if let (Some(storage), Some(caching_config)) = ( self.storage.clone(), - subgraph_config + final_config .as_ref() .and_then(|c| c.experimental_entity_caching.as_ref()), ) {