Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis reconnections #3509

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changesets/fix_geal_redis_fixes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Fix redis reconnections ([Issue #3045](https://github.com/apollographql/router/issues/3045))

The reconnection policy was using an exponential backoff delay with a maximum number of attempts. Once that maximum is reached, reconnection was never tried again (there's no baseline retry). We change that behaviour by adding infinite retries with a maximum delay of 2 seconds, and a timeout of 1 millisecond on redis commands, so that the router can continue serving requests in the meantime.

This commit contains additional fixes:
- release the lock on the in memory cache while waiting for redis, to let the in memory cache serve other requests
- add a custom serializer for `SubSelectionKey`: this type is used as key in a `HashMap`, which is converted to a JSON object, and object keys must be strings, so a specific serializer is needed instead of the derived one

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3509
8 changes: 6 additions & 2 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fred::prelude::RedisError;
use fred::prelude::RedisErrorKind;
use fred::types::Expiration;
use fred::types::FromRedis;
use fred::types::PerformanceConfig;
use fred::types::ReconnectPolicy;
use fred::types::RedisConfig;
use url::Url;
Expand Down Expand Up @@ -114,8 +115,11 @@ impl RedisCacheStorage {

let client = RedisClient::new(
config,
None, //perf policy
Some(ReconnectPolicy::new_exponential(10, 1, 2000, 10)),
Some(PerformanceConfig {
default_command_timeout_ms: 1,
..Default::default()
}),
Some(ReconnectPolicy::new_exponential(0, 1, 2000, 5)),
);
let _handle = client.connect();

Expand Down
10 changes: 6 additions & 4 deletions apollo-router/src/cache/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ where
}

pub(crate) async fn get(&self, key: &K) -> Option<V> {
let mut guard = self.inner.lock().await;
let instant_memory = Instant::now();
match guard.get(key) {
let res = self.inner.lock().await.get(key).cloned();

match res {
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
Some(v) => {
tracing::info!(
monotonic_counter.apollo_router_cache_hit_count = 1u64,
Expand All @@ -97,7 +98,7 @@ where
kind = %self.caller,
storage = &tracing::field::display(CacheStorageName::Memory),
);
Some(v.clone())
Some(v)
}
None => {
let duration = instant_memory.elapsed().as_secs_f64();
Expand All @@ -117,7 +118,8 @@ where
let inner_key = RedisKey(key.clone());
match redis.get::<K, V>(inner_key).await {
Some(v) => {
guard.put(key.clone(), v.0.clone());
self.inner.lock().await.put(key.clone(), v.0.clone());

tracing::info!(
monotonic_counter.apollo_router_cache_hit_count = 1u64,
kind = %self.caller,
Expand Down
60 changes: 59 additions & 1 deletion apollo-router/src/spec/query/subselections.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use serde::de::Visitor;
use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::ByteString;
Expand All @@ -14,12 +15,69 @@ use crate::spec::IncludeSkip;
use crate::spec::SpecError;
use crate::Configuration;

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq)]
#[derive(Debug, PartialEq, Hash, Eq)]
pub(crate) struct SubSelectionKey {
pub(crate) defer_label: Option<String>,
pub(crate) defer_conditions: BooleanValues,
}

// Do not replace this with a derived Serialize implementation
// SubSelectionKey must serialize to a string because it is used as a JSON object key
impl Serialize for SubSelectionKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = format!(
"{:?}|{}",
self.defer_conditions.bits,
self.defer_label.as_deref().unwrap_or("")
);
serializer.serialize_str(&s)
}
}

impl<'de> Deserialize<'de> for SubSelectionKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_str(SubSelectionKeyVisitor)
}
}

struct SubSelectionKeyVisitor;
impl<'de> Visitor<'de> for SubSelectionKeyVisitor {
type Value = SubSelectionKey;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter
.write_str("a string containing the defer label and defer conditions separated by |")
}

fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if let Some((bits_str, label)) = s.split_once('|') {
Ok(SubSelectionKey {
defer_conditions: BooleanValues {
bits: bits_str
.parse::<u32>()
.map_err(|_| E::custom("expected a number"))?,
},
defer_label: if label.is_empty() {
None
} else {
Some(label.to_string())
},
})
} else {
Err(E::custom("invalid subselection"))
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct SubSelectionValue {
pub(crate) selection_set: Vec<Selection>,
Expand Down