Skip to content

Commit

Permalink
added default config for retry_on_error
Browse files Browse the repository at this point in the history
  • Loading branch information
mcatanzariti committed Dec 29, 2022
1 parent 675380f commit 34b33cf
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 52 deletions.
80 changes: 62 additions & 18 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@ use crate::{
SortedSetCommands, StreamCommands, StringCommands, TransactionCommands,
},
network::{
JoinHandle, PushReceiver, PushSender, MsgSender, NetworkHandler, PubSubReceiver,
PubSubSender, ReconnectReceiver, ReconnectSender, timeout,
timeout, JoinHandle, MsgSender, NetworkHandler, PubSubReceiver, PubSubSender, PushReceiver,
PushSender, ReconnectReceiver, ReconnectSender,
},
resp::{
cmd, Command, CommandArgs, FromValue, ResultValueExt, SingleArg, SingleArgCollection, Value,
},
Error, Future, Result, ValueReceiver, ValueSender,
};
use futures::{channel::{mpsc, oneshot}, Stream};
use std::{future::IntoFuture, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, time::Duration};
use futures::{
channel::{mpsc, oneshot},
Stream,
};
use std::{
future::IntoFuture,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
time::Duration,
};

use super::ClientTrackingInvalidationStream;

Expand All @@ -43,6 +50,7 @@ pub struct Client {
reconnect_sender: ReconnectSender,
client_state: Arc<RwLock<ClientState>>,
command_timeout: Duration,
retry_on_error: bool,
}

impl Drop for Client {
Expand Down Expand Up @@ -77,6 +85,7 @@ impl Client {
pub async fn connect(config: impl IntoConfig) -> Result<Self> {
let config = config.into_config()?;
let command_timeout = config.command_timeout;
let retry_on_error = config.retry_on_error;
let (msg_sender, network_task_join_handle, reconnect_sender) =
NetworkHandler::connect(config.into_config()?).await?;

Expand All @@ -86,6 +95,7 @@ impl Client {
reconnect_sender,
client_state: Arc::new(RwLock::new(ClientState::new())),
command_timeout,
retry_on_error,
})
}

Expand Down Expand Up @@ -141,6 +151,9 @@ impl Client {
/// # Arguments
/// * `command` - generic [`Command`](crate::resp::Command) meant to be sent to the Redis server.
/// * `retry_on_error` - retry to send the command on network error.
/// * `None` - default behaviour defined in [`Config::retry_on_error`](crate::client::Config::retry_on_error)
/// * `Some(true)` - retry sending command on network error
/// * `Some(false)` - do not retry sending command on network error
///
/// # Errors
/// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
Expand All @@ -154,7 +167,7 @@ impl Client {
/// let mut client = Client::connect("127.0.0.1:6379").await?;
///
/// let values: Vec<String> = client
/// .send(cmd("MGET").arg("key1").arg("key2").arg("key3").arg("key4"), false)
/// .send(cmd("MGET").arg("key1").arg("key2").arg("key3").arg("key4"), None)
/// .await?
/// .into()?;
/// println!("{:?}", values);
Expand All @@ -164,9 +177,13 @@ impl Client {
/// ```

#[inline]
pub async fn send(&mut self, command: Command, retry_on_error: bool) -> Result<Value> {
pub async fn send(&mut self, command: Command, retry_on_error: Option<bool>) -> Result<Value> {
let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
let message = Message::single(command, value_sender, retry_on_error);
let message = Message::single(
command,
value_sender,
retry_on_error.unwrap_or(self.retry_on_error),
);
self.send_message(message)?;
let value = if self.command_timeout != Duration::ZERO {
timeout(self.command_timeout, value_receiver).await??
Expand All @@ -177,16 +194,24 @@ impl Client {
}

/// Send command to the Redis server and forget its response.
///
///
/// # Arguments
/// * `command` - generic [`Command`](crate::resp::Command) meant to be sent to the Redis server.
/// * `retry_on_error` - retry to send the command on network error.
/// * `None` - default behaviour defined in [`Config::retry_on_error`](crate::client::Config::retry_on_error)
/// * `Some(true)` - retry sending command on network error
/// * `Some(false)` - do not retry sending command on network error
///
/// # Errors
/// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
#[inline]
pub fn send_and_forget(&mut self, command: Command, retry_on_error: bool) -> Result<()> {
let message = Message::single_forget(command, retry_on_error);
pub fn send_and_forget(
&mut self,
command: Command,
retry_on_error: Option<bool>,
) -> Result<()> {
let message =
Message::single_forget(command, retry_on_error.unwrap_or(self.retry_on_error));
self.send_message(message)?;
Ok(())
}
Expand All @@ -196,13 +221,24 @@ impl Client {
/// # Arguments
/// * `commands` - batch of generic [`Command`](crate::resp::Command)s meant to be sent to the Redis server.
/// * `retry_on_error` - retry to send the command batch on network error.
/// * `None` - default behaviour defined in [`Config::retry_on_error`](crate::client::Config::retry_on_error)
/// * `Some(true)` - retry sending batch on network error
/// * `Some(false)` - do not retry sending batch on network error
///
/// # Errors
/// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
#[inline]
pub async fn send_batch(&mut self, commands: Vec<Command>, retry_on_error: bool) -> Result<Value> {
pub async fn send_batch(
&mut self,
commands: Vec<Command>,
retry_on_error: Option<bool>,
) -> Result<Value> {
let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
let message = Message::batch(commands, value_sender, retry_on_error);
let message = Message::batch(
commands,
value_sender,
retry_on_error.unwrap_or(self.retry_on_error),
);
self.send_message(message)?;
let value = if self.command_timeout != Duration::ZERO {
timeout(self.command_timeout, value_receiver).await??
Expand Down Expand Up @@ -236,7 +272,9 @@ impl Client {
Pipeline::new(self.clone())
}

pub fn create_client_tracking_invalidation_stream(&mut self) -> Result<impl Stream<Item = Vec<String>>> {
pub fn create_client_tracking_invalidation_stream(
&mut self,
) -> Result<impl Stream<Item = Vec<String>>> {
let (push_sender, push_receiver): (PushSender, PushReceiver) = mpsc::unbounded();
let message = Message::client_tracking_invalidation(push_sender);
self.send_message(message)?;
Expand Down Expand Up @@ -338,7 +376,8 @@ where
/// # Errors
/// Any Redis driver [`Error`](crate::Error) that occur during the send operation
fn forget(self) -> Result<()> {
self.executor.send_and_forget(self.command, self.retry_on_error)
self.executor
.send_and_forget(self.command, self.retry_on_error)
}
}

Expand All @@ -359,10 +398,16 @@ where
.into_with_command(&command_for_result)
} else if let Some(post_process) = self.post_process {
let command_for_result = self.command.clone();
let result = self.executor.send(self.command, self.retry_on_error).await?;
let result = self
.executor
.send(self.command, self.retry_on_error)
.await?;
post_process(result, command_for_result, self.executor).await
} else {
self.executor.send(self.command, self.retry_on_error).await?.into()
self.executor
.send(self.command, self.retry_on_error)
.await?
.into()
}
})
}
Expand Down Expand Up @@ -491,8 +536,7 @@ impl BlockingCommands for Client {
fn monitor(&mut self) -> Future<MonitorStream> {
Box::pin(async move {
let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
let (push_sender, push_receiver): (PushSender, PushReceiver) =
mpsc::unbounded();
let (push_sender, push_receiver): (PushSender, PushReceiver) = mpsc::unbounded();

let message = Message::monitor(cmd("MONITOR"), value_sender, push_sender);

Expand Down
29 changes: 29 additions & 0 deletions src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const DEFAULT_AUTO_REMONITOR: bool = true;
const DEFAULT_KEEP_ALIVE: Option<Duration> = None;
const DEFAULT_NO_DELAY: bool = true;
const DEFAULT_MAX_COMMAND_ATTEMPTS: usize = 3;
const DEFAULT_RETRY_ON_ERROR: bool = false;

type Uri<'a> = (
&'a str,
Expand Down Expand Up @@ -84,6 +85,17 @@ pub struct Config {
pub no_delay: bool,
/// Maximum number of retry attempts to send a command to the Redis server.
pub max_command_attempts: usize,
/// Defines the default strategy for retries on network error (default `false`).
///
/// This strategy can be overriden for each command/batch
/// of commands in the following functions:
/// * [`PreparedCommand::retry_on_error`](crate::client::PreparedCommand::retry_on_error)
/// * [`Pipeline::retry_on_error`](crate::client::Pipeline::retry_on_error)
/// * [`Transaction::retry_on_error`](crate::client::Transaction::retry_on_error)
/// * [`Client::send`](crate::client::Client::send)
/// * [`Client::send_and_forget`](crate::client::Client::send_and_forget)
/// * [`Client::send_batch`](crate::client::Client::send_batch)
pub retry_on_error: bool,
}

impl Default for Config {
Expand All @@ -103,6 +115,7 @@ impl Default for Config {
keep_alive: DEFAULT_KEEP_ALIVE,
no_delay: DEFAULT_NO_DELAY,
max_command_attempts: DEFAULT_MAX_COMMAND_ATTEMPTS,
retry_on_error: DEFAULT_RETRY_ON_ERROR,
}
}
}
Expand Down Expand Up @@ -300,6 +313,12 @@ impl Config {
config.max_command_attempts = max_command_attempts;
}
}

if let Some(retry_on_error) = query.remove("retry_on_error") {
if let Ok(retry_on_error) = retry_on_error.parse::<bool>() {
config.retry_on_error = retry_on_error;
}
}
}

Some(config)
Expand Down Expand Up @@ -567,6 +586,16 @@ impl ToString for Config {
s.push_str(&format!("max_command_attempts={}", self.max_command_attempts));
}

if self.retry_on_error != DEFAULT_RETRY_ON_ERROR{
if !query_separator {
query_separator = true;
s.push('?');
} else {
s.push('&');
}
s.push_str(&format!("retry_on_error={}", self.retry_on_error));
}

if let ServerConfig::Sentinel(SentinelConfig {
instances: _,
service_name: _,
Expand Down
1 change: 1 addition & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ of the struct [`Config`](Config) or its dependencies:
* [`keep_alive`](Config::keep_alive) - Enable/disable keep-alive functionality (default `None`)
* [`no_delay`](Config::no_delay) - Enable/disable the use of Nagle's algorithm (default `true`)
* [`max_command_attempts`](Config::max_command_attempts) - Maximum number of retry attempts to send a command to the Redis server.
* [`retry_on_error`](Config::retry_on_error) - Defines the default strategy for retries on network error (default `false`).
* [`wait_between_failures`](SentinelConfig::wait_between_failures) - (Sentinel only) Waiting time after
failing before connecting to the next Sentinel instance (default 250ms).
* [`sentinel_username`](SentinelConfig::username) - (Sentinel only) Sentinel username
Expand Down
13 changes: 7 additions & 6 deletions src/client/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct Pipeline {
client: Client,
commands: Vec<Command>,
forget_flags: Vec<bool>,
retry_on_error: bool,
retry_on_error: Option<bool>,
}

impl Pipeline {
Expand All @@ -36,13 +36,14 @@ impl Pipeline {
client,
commands: Vec::new(),
forget_flags: Vec::new(),
retry_on_error: false,
retry_on_error: None,
}
}

/// Set a flag to retry sending the command on network error (default `false`).
pub fn retry_on_error(&mut self) {
self.retry_on_error = true;
/// Set a flag to override default `retry_on_error` behavior.
///
/// See [Config::retry_on_error](crate::client::Config::retry_on_error)
pub fn retry_on_error(&mut self, retry_on_error: bool) {
self.retry_on_error = Some(retry_on_error);
}

/// Queue a command
Expand Down
14 changes: 8 additions & 6 deletions src/client/prepared_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ where
/// Post process functor te be called instead of
/// the [`FromValue`](crate::resp::FromValue) trait.
pub post_process: Option<Box<PostProcessFunc<'a, R>>>,
/// Flag to retry sending the command on network error (default `false`).
pub retry_on_error: bool,
/// Flag to retry sending the command on network error.
pub retry_on_error: Option<bool>,
}

impl<'a, T, R> PreparedCommand<'a, T, R>
Expand All @@ -45,7 +45,7 @@ where
command,
keep_command_for_result: false,
post_process: None,
retry_on_error: false,
retry_on_error: None,
}
}

Expand All @@ -61,9 +61,11 @@ where
self
}

/// Set a flag to retry sending the command on network error (default `false`).
pub fn retry_on_error(mut self) -> Self {
self.retry_on_error = true;
/// Set a flag to override default `retry_on_error` behavior.
///
/// See [Config::retry_on_error](crate::client::Config::retry_on_error)
pub fn retry_on_error(mut self, retry_on_error: bool) -> Self {
self.retry_on_error = Some(retry_on_error);
self
}

Expand Down
12 changes: 7 additions & 5 deletions src/client/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct Transaction {
client: Client,
commands: Vec<Command>,
forget_flags: Vec<bool>,
retry_on_error: bool,
retry_on_error: Option<bool>,
}

impl Transaction {
Expand All @@ -36,16 +36,18 @@ impl Transaction {
client,
commands: Vec::new(),
forget_flags: Vec::new(),
retry_on_error: false,
retry_on_error: None,
};

transaction.queue(cmd("MULTI"));
transaction
}

/// Set a flag to retry sending the command on network error (default `false`).
pub fn retry_on_error(&mut self) {
self.retry_on_error = true;
/// Set a flag to override default `retry_on_error` behavior.
///
/// See [Config::retry_on_error](crate::client::Config::retry_on_error)
pub fn retry_on_error(&mut self, retry_on_error: bool) {
self.retry_on_error = Some(retry_on_error);
}

/// Queue a command into the transaction.
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn main() -> Result<()> {
let mut client = Client::connect("127.0.0.1:6379").await?;
let values: Vec<String> = client
.send(cmd("MGET").arg("key1").arg("key2").arg("key3").arg("key4"), false)
.send(cmd("MGET").arg("key1").arg("key2").arg("key3").arg("key4"), None)
.await?
.into()?;
println!("{:?}", values);
Expand Down
1 change: 1 addition & 0 deletions src/resp/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct Command {
pub name: &'static str,
/// Collection of arguments of the command.
pub args: CommandArgs,
#[doc(hidden)]
#[cfg(debug_assertions)]
pub kill_connection_on_write: usize,
}
Expand Down
6 changes: 3 additions & 3 deletions src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use serial_test::serial;
async fn send() -> Result<()> {
let mut client = get_test_client().await?;

client.send(cmd("PING"), false).await?;
client.send(cmd("PING"), None).await?;

client.close().await?;

Expand All @@ -31,8 +31,8 @@ async fn send() -> Result<()> {
async fn forget() -> Result<()> {
let mut client = get_test_client().await?;

client.send_and_forget(cmd("PING"), false)?;
client.send(cmd("PING"), false).await?;
client.send_and_forget(cmd("PING"), None)?;
client.send(cmd("PING"), None).await?;

client.close().await?;

Expand Down
Loading

0 comments on commit 34b33cf

Please sign in to comment.