Skip to content

Commit

Permalink
fix: add timeouts to cluster sync commands and backchannel disconnect (
Browse files Browse the repository at this point in the history
  • Loading branch information
aembke authored May 2, 2023
1 parent c388b8c commit 7b81093
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "6.2.0"
version = "6.2.1"
authors = ["Alec Embke <[email protected]>"]
edition = "2021"
description = "An async Redis client built on Tokio."
Expand Down
7 changes: 6 additions & 1 deletion src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,13 @@ impl RedisTransport {

/// Send `QUIT` and close the connection.
pub async fn disconnect(&mut self, inner: &Arc<RedisClientInner>) -> Result<(), RedisError> {
let timeout = globals().default_connection_timeout_ms();
let command: RedisCommand = RedisCommandKind::Quit.into();
let _ = self.request_response(command, inner.is_resp3()).await?;
let quit_ft = self.request_response(command, inner.is_resp3());

if let Err(e) = client_utils::apply_timeout(quit_ft, timeout).await {
_warn!(inner, "Error calling QUIT on backchannel: {:?}", e);
}
let _ = self.transport.close().await;

Ok(())
Expand Down
13 changes: 9 additions & 4 deletions src/router/clustered.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
error::{RedisError, RedisErrorKind},
globals::globals,
interfaces,
interfaces::Resp3Frame,
modules::inner::RedisClientInner,
Expand All @@ -13,6 +14,7 @@ use crate::{
},
router::{responses, types::ClusterChange, utils, Connections, Written},
types::ClusterStateChange,
utils as client_utils,
};
use std::{
collections::{BTreeSet, HashMap},
Expand Down Expand Up @@ -552,6 +554,8 @@ pub async fn cluster_slots_backchannel(
inner: &Arc<RedisClientInner>,
cache: Option<&ClusterRouting>,
) -> Result<ClusterRouting, RedisError> {
let timeout = globals().default_connection_timeout_ms();

let (response, host) = {
let command: RedisCommand = RedisCommandKind::ClusterSlots.into();

Expand All @@ -561,8 +565,8 @@ pub async fn cluster_slots_backchannel(
if let Some(ref mut transport) = backchannel.transport {
let default_host = transport.default_host.clone();

transport
.request_response(command, inner.is_resp3())
_trace!(inner, "Sending backchannel CLUSTER SLOTS to {}", transport.server);
client_utils::apply_timeout(transport.request_response(command, inner.is_resp3()), timeout)
.await
.ok()
.map(|frame| (frame, default_host))
Expand All @@ -582,7 +586,8 @@ pub async fn cluster_slots_backchannel(
if frame.is_error() {
// try connecting to any of the nodes, then try again
let mut transport = connect_any(inner, old_cache).await?;
let frame = transport.request_response(command, inner.is_resp3()).await?;
let frame =
client_utils::apply_timeout(transport.request_response(command, inner.is_resp3()), timeout).await?;
let host = transport.default_host.clone();
inner.update_backchannel(transport).await;

Expand All @@ -594,7 +599,7 @@ pub async fn cluster_slots_backchannel(
} else {
// try connecting to any of the nodes, then try again
let mut transport = connect_any(inner, old_cache).await?;
let frame = transport.request_response(command, inner.is_resp3()).await?;
let frame = client_utils::apply_timeout(transport.request_response(command, inner.is_resp3()), timeout).await?;
let host = transport.default_host.clone();
inner.update_backchannel(transport).await;

Expand Down
4 changes: 2 additions & 2 deletions src/router/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ pub fn check_special_errors(inner: &Arc<RedisClientInner>, frame: &Resp3Frame) -

/// Handle an error in the reader task that should end the connection.
pub fn broadcast_reader_error(inner: &Arc<RedisClientInner>, server: &Server, error: Option<RedisError>) {
_debug!(inner, "Ending reader task from {} due to {:?}", server, error);
_warn!(inner, "Ending reader task from {} due to {:?}", server, error);

if inner.should_reconnect() {
inner.send_reconnect(Some(server.clone()), false, None);
Expand All @@ -329,7 +329,7 @@ pub fn broadcast_replica_error(inner: &Arc<RedisClientInner>, server: &Server, e

#[cfg(feature = "replicas")]
pub fn broadcast_replica_error(inner: &Arc<RedisClientInner>, server: &Server, error: Option<RedisError>) {
_debug!(inner, "Ending replica reader task from {} due to {:?}", server, error);
_warn!(inner, "Ending replica reader task from {} due to {:?}", server, error);

if inner.should_reconnect() {
inner.send_replica_reconnect(server);
Expand Down

0 comments on commit 7b81093

Please sign in to comment.