diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 5c0702d85..f9c76f516 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -41,14 +41,14 @@ use std::str::FromStr; use std::thread; use std::time::Duration; -use rand::{seq::IteratorRandom, thread_rng, Rng}; +use rand::{seq::IteratorRandom, thread_rng}; use crate::cluster_pipeline::UNROUTABLE_ERROR; use crate::cluster_routing::{ - MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr, + MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, }; use crate::cluster_slotmap::SlotMap; -use crate::cluster_topology::{parse_and_count_slots, SLOT_SIZE}; +use crate::cluster_topology::parse_and_count_slots; use crate::cmd::{cmd, Cmd}; use crate::connection::{ connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo, @@ -459,12 +459,9 @@ where }; match RoutingInfo::for_routable(cmd) { - Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => { - let mut rng = thread_rng(); - Ok(addr_for_slot(Route::new( - rng.gen_range(0..SLOT_SIZE), - SlotAddr::Master, - ))?) + Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + | Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::RandomPrimary)) => { + Ok(addr_for_slot(Route::new_random_primary())?) } Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => { Ok(addr_for_slot(route)?) @@ -730,6 +727,9 @@ where SingleNodeRoutingInfo::SpecificNode(route) => { self.get_connection(&mut connections, route)? } + SingleNodeRoutingInfo::RandomPrimary => { + self.get_connection(&mut connections, &Route::new_random_primary())? + } SingleNodeRoutingInfo::ByAddress { host, port } => { let address = format!("{host}:{port}"); let conn = self.get_connection_by_addr(&mut connections, &address)?; diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 965a05cf8..be7beb79b 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -588,6 +588,9 @@ impl From for InternalSingleNodeRouting { SingleNodeRoutingInfo::SpecificNode(route) => { InternalSingleNodeRouting::SpecificNode(route) } + SingleNodeRoutingInfo::RandomPrimary => { + InternalSingleNodeRouting::SpecificNode(Route::new_random_primary()) + } SingleNodeRoutingInfo::ByAddress { host, port } => { InternalSingleNodeRouting::ByAddress(format!("{host}:{port}")) } @@ -620,6 +623,9 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> Some(cluster_routing::RoutingInfo::SingleNode( SingleNodeRoutingInfo::SpecificNode(route), )) => Some(route), + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) => Some(Route::new_random_primary()), Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { .. diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 848d4d750..ded53bc35 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1,3 +1,4 @@ +use rand::Rng; use std::cmp::min; use std::collections::HashMap; @@ -66,6 +67,8 @@ pub enum RoutingInfo { pub enum SingleNodeRoutingInfo { /// Route to any node at random Random, + /// Route to any *primary* node + RandomPrimary, /// Route to the node that matches the [Route] SpecificNode(Route), /// Route to the node with the given address. @@ -610,7 +613,13 @@ impl RoutingInfo { .and_then(|x| std::str::from_utf8(x).ok()) .and_then(|x| x.parse::().ok())?; if key_count == 0 { - Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + if is_readonly_cmd(cmd) { + Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + } else { + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) + } } else { r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key)) } @@ -949,6 +958,17 @@ impl Route { pub fn slot_addr(&self) -> SlotAddr { self.1 } + + /// Returns a new Route for a random primary node + pub fn new_random_primary() -> Self { + Self::new(random_slot(), SlotAddr::Master) + } +} + +/// Choose a random slot from `0..SLOT_SIZE` (excluding) +fn random_slot() -> u16 { + let mut rng = rand::thread_rng(); + rng.gen_range(0..crate::cluster_topology::SLOT_SIZE) } #[cfg(test)] @@ -1096,12 +1116,31 @@ mod tests { cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0), cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0), ] { + // EVAL / EVALSHA are expected to be routed to a RandomPrimary assert_eq!( RoutingInfo::for_routable(cmd), - Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary + )) ); } + // FCALL (with 0 keys) is expected to be routed to a random primary node + assert_eq!( + RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(0)), + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary + )) + ); + + // While FCALL with N keys is expected to be routed to a specific node + assert_eq!( + RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(1).arg("mykey")), + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"mykey"), SlotAddr::Master)) + )) + ); + for (cmd, expected) in [ ( cmd("EVAL")