Skip to content

Commit

Permalink
7.1.2 (#201)
Browse files Browse the repository at this point in the history
* fix: ensure reconnections are attempted following unroutable commands
  • Loading branch information
aembke committed Jan 12, 2024
1 parent 79df369 commit 52bff98
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 7.1.2

* Fix intermittent cluster routing errors

## 7.1.1

* Fix cluster failover in transactions
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "7.1.1"
version = "7.1.2"
authors = ["Alec Embke <[email protected]>"]
edition = "2021"
description = "An async Redis client built on Tokio."
Expand Down
16 changes: 13 additions & 3 deletions src/router/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,22 @@ async fn write_with_backpressure(
}
router.sync_network_timeout_state();

utils::defer_reconnect(inner);
break;
},
Written::NotFound(mut command) => {
if let Err(e) = command.decr_check_redirections() {
command.finish(inner, Err(e));
utils::defer_reconnect(inner);
break;
}

_debug!(inner, "Perform cluster sync after missing hash slot lookup.");
if let Err(error) = router.sync_cluster().await {
// try to sync the cluster once, and failing that buffer the command. a failed cluster sync will clear local
// cluster state and old connections, which then forces a reconnect from the reader tasks when the streams
// close.
// try to sync the cluster once, and failing that buffer the command.
_warn!(inner, "Failed to sync cluster after NotFound: {:?}", error);
router.buffer_command(command);
utils::defer_reconnect(inner);
break;
} else {
_command = Some(command);
Expand Down Expand Up @@ -248,6 +249,7 @@ async fn write_with_backpressure(
}
inner.notifications.broadcast_error(error.clone());

utils::defer_reconnect(inner);
return Err(error);
},
#[cfg(feature = "replicas")]
Expand Down Expand Up @@ -421,6 +423,14 @@ async fn process_reconnect(
}
}

if !force && router.has_healthy_centralized_connection() {
_debug!(inner, "Skip reconnecting to centralized host");
if let Some(tx) = tx {
let _ = tx.send(Ok(Resp3Frame::Null));
}
return Ok(());
}

_debug!(inner, "Starting reconnection loop...");
if let Err(e) = utils::reconnect_with_policy(inner, router).await {
if let Some(tx) = tx {
Expand Down
14 changes: 12 additions & 2 deletions src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,15 @@ impl Router {
}
}

pub fn has_healthy_centralized_connection(&self) -> bool {
match self.connections {
Connections::Centralized { ref writer } | Connections::Sentinel { ref writer } => {
writer.as_ref().map(|w| w.is_working()).unwrap_or(false)
},
Connections::Clustered { .. } => false,
}
}

/// Attempt to send the command to the server.
pub async fn write(&mut self, command: RedisCommand, force_flush: bool) -> Written {
let send_all_cluster_nodes = self.inner.config.server.is_clustered()
Expand Down Expand Up @@ -857,8 +866,8 @@ impl Router {
"{}: Disconnect while retrying after write error: {:?}",
&self.inner.id, error
);
// triggers a reconnect if needed
self.connections.disconnect(&self.inner, server.as_ref()).await;
utils::defer_reconnect(&self.inner);
continue;
},
Written::NotFound(command) => {
Expand All @@ -868,8 +877,8 @@ impl Router {
"{}: Disconnect and re-sync cluster state after routing error while retrying commands.",
self.inner.id
);
// triggers a reconnect if needed
self.disconnect_all().await;
utils::defer_reconnect(&self.inner);
break;
},
Written::Error((error, command)) => {
Expand All @@ -878,6 +887,7 @@ impl Router {
command.finish(&self.inner, Err(error));
}
self.disconnect_all().await;
utils::defer_reconnect(&self.inner);
break;
},
_ => {},
Expand Down
26 changes: 23 additions & 3 deletions src/router/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{
error::{RedisError, RedisErrorKind},
interfaces,
modules::inner::RedisClientInner,
prelude::Resp3Frame,
protocol::{
command::{ClusterErrorKind, RedisCommand, RedisCommandKind, RouterResponse},
command::{ClusterErrorKind, RedisCommand, RedisCommandKind, RouterCommand, RouterResponse},
connection::{RedisWriter, SharedBuffer, SplitStreamKind},
responders::ResponseKind,
types::*,
Expand All @@ -24,8 +25,6 @@ use tokio::{
sync::{mpsc::UnboundedReceiver, oneshot::channel as oneshot_channel},
};

#[cfg(feature = "replicas")]
use crate::{interfaces, protocol::command::RouterCommand};
#[cfg(feature = "check-unresponsive")]
use futures::future::Either;
#[cfg(feature = "check-unresponsive")]
Expand Down Expand Up @@ -530,6 +529,27 @@ pub fn defer_replica_sync(inner: &Arc<RedisClientInner>) {
}
}

pub fn defer_reconnect(inner: &Arc<RedisClientInner>) {
if inner.config.server.is_clustered() {
let (tx, _) = oneshot_channel();
let cmd = RouterCommand::SyncCluster { tx };
if let Err(_) = interfaces::send_to_router(inner, cmd) {
_warn!(inner, "Failed to send deferred cluster sync.")
}
} else {
let cmd = RouterCommand::Reconnect {
server: None,
tx: None,
force: false,
#[cfg(feature = "replicas")]
replica: false,
};
if let Err(_) = interfaces::send_to_router(inner, cmd) {
_warn!(inner, "Failed to send deferred cluster sync.")
}
}
}

#[cfg(feature = "check-unresponsive")]
pub async fn next_frame(
inner: &Arc<RedisClientInner>,
Expand Down

0 comments on commit 52bff98

Please sign in to comment.