From b070601cb0ad3eb8a6369881982a0786e98e4030 Mon Sep 17 00:00:00 2001 From: Alec Embke Date: Tue, 20 Feb 2024 10:54:10 -0800 Subject: [PATCH] 8.0.3 (#218) * chore: box pin large futures --- .circleci/config.yml | 2 +- CHANGELOG.md | 4 + Cargo.toml | 2 +- examples/basic.rs | 4 +- examples/client_tracking.rs | 17 +- examples/custom.rs | 17 +- examples/dns.rs | 6 +- examples/keyspace.rs | 34 +-- examples/lua.rs | 2 +- examples/pool.rs | 7 +- examples/pubsub.rs | 4 +- src/clients/pubsub.rs | 10 +- src/clients/replica.rs | 2 +- src/commands/impls/redis_json.rs | 8 +- src/commands/impls/tracking.rs | 20 +- src/commands/interfaces/sentinel.rs | 2 +- src/commands/interfaces/timeseries.rs | 4 - src/lib.rs | 3 + src/modules/mocks.rs | 4 +- src/monitor/parser.rs | 6 +- src/monitor/utils.rs | 6 +- src/protocol/cluster.rs | 6 +- src/protocol/command.rs | 2 +- src/protocol/connection.rs | 30 ++- src/protocol/public.rs | 4 +- src/protocol/responders.rs | 2 +- src/protocol/tls.rs | 21 +- src/protocol/utils.rs | 10 +- src/router/commands.rs | 315 +++++++++++++------------- src/router/mod.rs | 14 +- src/router/replicas.rs | 17 +- src/router/responses.rs | 4 +- src/router/sentinel.rs | 4 +- src/router/utils.rs | 2 +- src/trace/enabled.rs | 4 +- src/types/client.rs | 2 +- src/types/config.rs | 42 ++-- src/types/timeseries.rs | 6 +- src/utils.rs | 18 +- tests/integration/other/mod.rs | 8 +- tests/integration/tracking/mod.rs | 32 +-- 41 files changed, 341 insertions(+), 366 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 7350fcbc..524aca56 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -214,7 +214,7 @@ jobs: - checkout - run: name: Clippy - command: cargo clippy -- -Dwarnings + command: cargo clippy --all-features --lib -p fred -- -Dwarnings workflows: diff --git a/CHANGELOG.md b/CHANGELOG.md index 20638120..80276721 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 8.0.3 + +* Box large futures to reduce stack usage. + ## 8.0.2 * Fix cluster replica failover at high concurrency. diff --git a/Cargo.toml b/Cargo.toml index fc587383..a91d182c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fred" -version = "8.0.2" +version = "8.0.3" authors = ["Alec Embke "] edition = "2021" description = "An async Redis client built on Tokio." diff --git a/examples/basic.rs b/examples/basic.rs index 32101e58..5b7c90b3 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -10,7 +10,7 @@ async fn main() -> Result<(), RedisError> { // see the `Builder` interface for more information let client = Builder::from_config(config).build()?; // callers can manage the tokio task driving the connections - let connection_task = client.init().await?; + let _connection_task = client.init().await?; // convert response types to most common rust types let foo: Option = client.get("foo").await?; println!("Foo: {:?}", foo); @@ -23,7 +23,5 @@ async fn main() -> Result<(), RedisError> { println!("Foo: {:?}", client.get::, _>("foo").await?); client.quit().await?; - // calling quit ends the connection and event listener tasks - let _ = connection_task.await; Ok(()) } diff --git a/examples/client_tracking.rs b/examples/client_tracking.rs index 7f44eef9..27c6995e 100644 --- a/examples/client_tracking.rs +++ b/examples/client_tracking.rs @@ -1,5 +1,4 @@ #![allow(clippy::disallowed_names)] -#![allow(clippy::let_underscore_future)] use fred::{interfaces::TrackingInterface, prelude::*, types::RespVersion}; @@ -16,14 +15,14 @@ async fn resp3_tracking_interface_example() -> Result<(), RedisError> { client.init().await?; // spawn a task that processes invalidation messages. - let _ = client.on_invalidation(|invalidation| { + let _invalidate_task = client.on_invalidation(|invalidation| { println!("{}: Invalidate {:?}", invalidation.server, invalidation.keys); Ok(()) }); // enable client tracking on all connections. it's usually a good idea to do this in an `on_reconnect` block. client.start_tracking(None, false, false, false, false).await?; - let _: () = client.get("foo").await?; + client.get("foo").await?; // send `CLIENT CACHING yes|no` before subsequent commands. the preceding `CLIENT CACHING yes|no` command will be // sent when the command is retried as well. @@ -58,14 +57,14 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> { // the invalidation subscriber interface is the same as above even in RESP2 mode **as long as the `client-tracking` // feature is enabled**. if the feature is disabled then the message will appear on the `on_message` receiver. let mut invalidations = subscriber.invalidation_rx(); - tokio::spawn(async move { + let _invalidate_task = tokio::spawn(async move { while let Ok(invalidation) = invalidations.recv().await { println!("{}: Invalidate {:?}", invalidation.server, invalidation.keys); } }); // in RESP2 mode we must manually subscribe to the invalidation channel. the `start_tracking` function does this // automatically with the RESP3 interface. - let _: () = subscriber.subscribe("__redis__:invalidate").await?; + subscriber.subscribe("__redis__:invalidate").await?; // enable client tracking, sending invalidation messages to the subscriber client let (_, connection_id) = subscriber @@ -74,7 +73,7 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> { .into_iter() .next() .expect("Failed to read subscriber connection ID"); - let _ = client + client .client_tracking("on", Some(connection_id), None, false, false, false, false) .await?; @@ -83,8 +82,8 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> { let pipeline = client.pipeline(); // it's recommended to pipeline `CLIENT CACHING yes|no` if the client is used across multiple tasks - let _: () = pipeline.client_caching(true).await?; - let _: () = pipeline.incr("foo").await?; + pipeline.client_caching(true).await?; + pipeline.incr("foo").await?; println!("Foo: {}", pipeline.last::().await?); Ok(()) @@ -94,7 +93,7 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> { // see https://redis.io/docs/manual/client-side-caching/ async fn main() -> Result<(), RedisError> { resp3_tracking_interface_example().await?; - // resp2_basic_interface_example().await?; + resp2_basic_interface_example().await?; Ok(()) } diff --git a/examples/custom.rs b/examples/custom.rs index 4a9c3b96..13fabac7 100644 --- a/examples/custom.rs +++ b/examples/custom.rs @@ -14,15 +14,8 @@ async fn main() -> Result<(), RedisError> { client.init().await?; client.lpush("foo", vec![1, 2, 3]).await?; - // some types require TryInto - let args: Vec = vec!["foo".into(), 0.into(), 3_u64.try_into()?]; - // returns a frame (https://docs.rs/redis-protocol/latest/redis_protocol/resp3/types/enum.Frame.html) - let frame = client.custom_raw(cmd!("LRANGE"), args.clone()).await?; - // or convert back to client types - let value: RedisValue = frame.try_into()?; - // and/or use the type conversion shorthand - let value: Vec = value.convert()?; - println!("LRANGE Values: {:?}", value); + let result: Vec = client.custom(cmd!("LRANGE"), vec!["foo", "0", "3"]).await?; + println!("LRANGE Values: {:?}", result); // or customize routing and blocking parameters let _ = cmd!("LRANGE", blocking: false); @@ -31,9 +24,9 @@ async fn main() -> Result<(), RedisError> { // which is shorthand for let command = CustomCommand::new("LRANGE", ClusterHash::FirstKey, false); - // convert to `FromRedis` types - let _: Vec = client - .custom_raw(command, args) + // or use `custom_raw` to operate on RESP3 frames + let _result: Vec = client + .custom_raw(command, vec!["foo", "0", "3"]) .await .and_then(|frame| frame.try_into()) .and_then(|value: RedisValue| value.convert())?; diff --git a/examples/dns.rs b/examples/dns.rs index 52258410..758d9632 100644 --- a/examples/dns.rs +++ b/examples/dns.rs @@ -12,8 +12,8 @@ use trust_dns_resolver::{ pub struct TrustDnsResolver(TokioAsyncResolver); -impl TrustDnsResolver { - fn new() -> Self { +impl Default for TrustDnsResolver { + fn default() -> Self { TrustDnsResolver(TokioAsyncResolver::tokio( ResolverConfig::default(), ResolverOpts::default(), @@ -39,7 +39,7 @@ impl Resolve for TrustDnsResolver { #[tokio::main] async fn main() -> Result<(), RedisError> { let client = Builder::default_centralized().build()?; - client.set_resolver(Arc::new(TrustDnsResolver::new())).await; + client.set_resolver(Arc::new(TrustDnsResolver::default())).await; client.init().await?; // ... diff --git a/examples/keyspace.rs b/examples/keyspace.rs index 864c595d..8ddf72fe 100644 --- a/examples/keyspace.rs +++ b/examples/keyspace.rs @@ -5,23 +5,6 @@ use fred::prelude::*; use std::time::Duration; use tokio::time::sleep; -async fn fake_traffic(client: &RedisClient, amount: usize) -> Result<(), RedisError> { - // use a new client since the provided client is subscribed to keyspace events - let client = client.clone_new(); - client.init().await?; - - for idx in 0 .. amount { - let key: RedisKey = format!("foo-{}", idx).into(); - - client.set(&key, 1, None, None, false).await?; - client.incr(&key).await?; - client.del(&key).await?; - } - - client.quit().await?; - Ok(()) -} - /// Examples showing how to set up keyspace notifications with clustered or centralized/sentinel deployments. /// /// The most complicated part of this process involves safely handling reconnections. Keyspace events rely on the @@ -40,6 +23,23 @@ async fn main() -> Result<(), RedisError> { Ok(()) } +async fn fake_traffic(client: &RedisClient, amount: usize) -> Result<(), RedisError> { + // use a new client since the provided client is subscribed to keyspace events + let client = client.clone_new(); + client.init().await?; + + for idx in 0 .. amount { + let key: RedisKey = format!("foo-{}", idx).into(); + + client.set(&key, 1, None, None, false).await?; + client.incr(&key).await?; + client.del(&key).await?; + } + + client.quit().await?; + Ok(()) +} + async fn centralized_keyspace_events() -> Result<(), RedisError> { let subscriber = Builder::default_centralized().build()?; diff --git a/examples/lua.rs b/examples/lua.rs index f168c391..b98667be 100644 --- a/examples/lua.rs +++ b/examples/lua.rs @@ -47,7 +47,7 @@ async fn scripts() -> Result<(), RedisError> { client.init().await?; let script = Script::from_lua(SCRIPTS[0]); - let _ = script.load(&client).await?; + script.load(&client).await?; let result = script.evalsha(&client, vec!["foo", "bar"], vec![1, 2]).await?; println!("First script result: {:?}", result); diff --git a/examples/pool.rs b/examples/pool.rs index d327073b..bc1d6289 100644 --- a/examples/pool.rs +++ b/examples/pool.rs @@ -10,15 +10,16 @@ async fn main() -> Result<(), RedisError> { // all client types, including `RedisPool`, implement the same command interface traits so callers can often use // them interchangeably. in this example each command below will be sent round-robin to the underlying 5 clients. - pool.get("foo").await?; + assert!(pool.get::, _>("foo").await?.is_none()); pool.set("foo", "bar", None, None, false).await?; - pool.get("foo").await?; + assert_eq!(pool.get::("foo").await?, "bar"); + pool.del("foo").await?; // interact with specific clients via next(), last(), or clients() let pipeline = pool.next().pipeline(); pipeline.incr("foo").await?; pipeline.incr("foo").await?; - let _: i64 = pipeline.last().await?; + assert_eq!(pipeline.last::().await?, 2); for client in pool.clients() { println!("{} connected to {:?}", client.id(), client.active_connections().await?); diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 216f1c02..6facaf03 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -21,7 +21,7 @@ async fn main() -> Result<(), RedisError> { }); for idx in 0 .. 50 { - let _ = publisher_client.publish("foo", idx).await?; + publisher_client.publish("foo", idx).await?; sleep(Duration::from_secs(1)).await; } @@ -52,7 +52,7 @@ async fn subscriber_example() -> Result<(), RedisError> { subscriber.subscribe("foo").await?; subscriber.psubscribe(vec!["bar*", "baz*"]).await?; subscriber.ssubscribe("abc{123}").await?; - // upon reconnecting the client will automatically re-subscribe to the above channels and patterns + // after reconnecting the client will automatically re-subscribe to the above channels and patterns println!("Subscriber channels: {:?}", subscriber.tracked_channels()); // "foo" println!("Subscriber patterns: {:?}", subscriber.tracked_patterns()); // "bar*", "baz*" println!("Subscriber shard channels: {:?}", subscriber.tracked_shard_channels()); // "abc{123}" diff --git a/src/clients/pubsub.rs b/src/clients/pubsub.rs index ef476fbb..6dbe445d 100644 --- a/src/clients/pubsub.rs +++ b/src/clients/pubsub.rs @@ -338,21 +338,21 @@ impl SubscriberClient { /// Unsubscribe from all tracked channels and patterns, and remove them from the client cache. pub async fn unsubscribe_all(&self) -> Result<(), RedisError> { - let channels: Vec = mem::replace(&mut *self.channels.write(), BTreeSet::new()) + let channels: Vec = mem::take(&mut *self.channels.write()) .into_iter() .map(|s| s.into()) .collect(); - let patterns: Vec = mem::replace(&mut *self.patterns.write(), BTreeSet::new()) + let patterns: Vec = mem::take(&mut *self.patterns.write()) .into_iter() .map(|s| s.into()) .collect(); - let shard_channels: Vec = mem::replace(&mut *self.shard_channels.write(), BTreeSet::new()) + let shard_channels: Vec = mem::take(&mut *self.shard_channels.write()) .into_iter() .map(|s| s.into()) .collect(); - let _ = self.unsubscribe(channels).await?; - let _ = self.punsubscribe(patterns).await?; + self.unsubscribe(channels).await?; + self.punsubscribe(patterns).await?; let shard_channel_groups = group_by_hash_slot(shard_channels)?; let shard_subscriptions: Vec<_> = shard_channel_groups diff --git a/src/clients/replica.rs b/src/clients/replica.rs index 086a7d5b..183c7bee 100644 --- a/src/clients/replica.rs +++ b/src/clients/replica.rs @@ -92,7 +92,7 @@ impl Replicas { pub async fn sync(&self) -> Result<(), RedisError> { let (tx, rx) = oneshot_channel(); let cmd = RouterCommand::SyncReplicas { tx }; - let _ = interfaces::send_to_router(&self.inner, cmd)?; + interfaces::send_to_router(&self.inner, cmd)?; rx.await? } } diff --git a/src/commands/impls/redis_json.rs b/src/commands/impls/redis_json.rs index e25d881c..e0ca7999 100644 --- a/src/commands/impls/redis_json.rs +++ b/src/commands/impls/redis_json.rs @@ -8,9 +8,9 @@ use crate::{ use bytes_utils::Str; use serde_json::Value; -const INDENT: &'static str = "INDENT"; -const NEWLINE: &'static str = "NEWLINE"; -const SPACE: &'static str = "SPACE"; +const INDENT: &str = "INDENT"; +const NEWLINE: &str = "NEWLINE"; +const SPACE: &str = "SPACE"; fn key_path_args(key: RedisKey, path: Option, extra: usize) -> Vec { let mut out = Vec::with_capacity(2 + extra); @@ -52,7 +52,7 @@ fn json_to_redis(value: Value) -> Result { )) } -fn values_to_bulk(values: &Vec) -> Result, RedisError> { +fn values_to_bulk(values: &[Value]) -> Result, RedisError> { values.iter().map(value_to_bulk_str).collect() } diff --git a/src/commands/impls/tracking.rs b/src/commands/impls/tracking.rs index ba61effd..c3e8af45 100644 --- a/src/commands/impls/tracking.rs +++ b/src/commands/impls/tracking.rs @@ -12,14 +12,14 @@ use crate::{ use redis_protocol::redis_keyslot; use tokio::sync::oneshot::channel as oneshot_channel; -pub static PREFIX: &'static str = "PREFIX"; -pub static REDIRECT: &'static str = "REDIRECT"; -pub static BCAST: &'static str = "BCAST"; -pub static OPTIN: &'static str = "OPTIN"; -pub static OPTOUT: &'static str = "OPTOUT"; -pub static NOLOOP: &'static str = "NOLOOP"; -pub static YES: &'static str = "YES"; -pub static NO: &'static str = "NO"; +pub static PREFIX: &str = "PREFIX"; +pub static REDIRECT: &str = "REDIRECT"; +pub static BCAST: &str = "BCAST"; +pub static OPTIN: &str = "OPTIN"; +pub static OPTOUT: &str = "OPTOUT"; +pub static NOLOOP: &str = "NOLOOP"; +pub static YES: &str = "YES"; +pub static NO: &str = "NO"; fn tracking_args( toggle: Toggle, @@ -88,7 +88,7 @@ pub async fn start_tracking( let (tx, rx) = oneshot_channel(); let response = ResponseKind::new_buffer(tx); let command: RedisCommand = (RedisCommandKind::_ClientTrackingCluster, args, response).into(); - let _ = client.send_command(command)?; + client.send_command(command)?; let frame = utils::apply_timeout(rx, client.inner().internal_command_timeout()).await??; let _ = protocol_utils::frame_to_results(frame)?; @@ -116,7 +116,7 @@ pub async fn stop_tracking(client: &C) -> Result<(), RedisError> let (tx, rx) = oneshot_channel(); let response = ResponseKind::new_buffer(tx); let command: RedisCommand = (RedisCommandKind::_ClientTrackingCluster, args, response).into(); - let _ = client.send_command(command)?; + client.send_command(command)?; let frame = utils::apply_timeout(rx, client.inner().internal_command_timeout()).await??; let _ = protocol_utils::frame_to_results(frame)?; diff --git a/src/commands/interfaces/sentinel.rs b/src/commands/interfaces/sentinel.rs index 62a028bd..03281cf7 100644 --- a/src/commands/interfaces/sentinel.rs +++ b/src/commands/interfaces/sentinel.rs @@ -149,7 +149,7 @@ pub trait SentinelInterface: ClientLike + Sized { { into!(name); try_into!(args); - commands::sentinel::set(self, name, args.into()).await?.convert() + commands::sentinel::set(self, name, args).await?.convert() } /// This command simulates different Sentinel crash scenarios. diff --git a/src/commands/interfaces/timeseries.rs b/src/commands/interfaces/timeseries.rs index c2e3ccec..b9c79b04 100644 --- a/src/commands/interfaces/timeseries.rs +++ b/src/commands/interfaces/timeseries.rs @@ -318,9 +318,7 @@ pub trait TimeSeriesInterface: ClientLike { J: IntoIterator + Send, { try_into!(from, to); - let labels = labels.map(|l| l.into()); let filters = filters.into_iter().map(|s| s.into()).collect(); - let group_by = group_by.map(|g| g.into()); let filter_by_ts = filter_by_ts.into_iter().collect(); commands::timeseries::ts_mrange( @@ -370,9 +368,7 @@ pub trait TimeSeriesInterface: ClientLike { J: IntoIterator + Send, { try_into!(from, to); - let labels = labels.map(|l| l.into()); let filters = filters.into_iter().map(|s| s.into()).collect(); - let group_by = group_by.map(|g| g.into()); let filter_by_ts = filter_by_ts.into_iter().collect(); commands::timeseries::ts_mrevrange( diff --git a/src/lib.rs b/src/lib.rs index 519fd96a..40074c46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,9 @@ #![allow(clippy::type_complexity)] #![allow(clippy::too_many_arguments)] #![allow(clippy::new_without_default)] +#![warn(clippy::large_types_passed_by_value)] +#![warn(clippy::large_stack_frames)] +#![warn(clippy::large_futures)] #![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))] #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(docsrs, allow(unused_attributes))] diff --git a/src/modules/mocks.rs b/src/modules/mocks.rs index a9dbb6cd..7d339366 100644 --- a/src/modules/mocks.rs +++ b/src/modules/mocks.rs @@ -285,7 +285,7 @@ impl Buffer { /// Read a copy of the internal command buffer without modifying the contents. pub fn inner(&self) -> Vec { - self.commands.lock().iter().map(|c| c.clone()).collect() + self.commands.lock().iter().cloned().collect() } /// Push a new command onto the back of the internal buffer. @@ -338,7 +338,7 @@ mod tests { }; let client = RedisClient::new(config, None, None, None); let jh = client.connect(); - let _ = client.wait_for_connect().await.expect("Failed to connect"); + client.wait_for_connect().await.expect("Failed to connect"); (client, jh) } diff --git a/src/monitor/parser.rs b/src/monitor/parser.rs index 9df8b128..d65b77e1 100644 --- a/src/monitor/parser.rs +++ b/src/monitor/parser.rs @@ -10,9 +10,9 @@ use nom::{ use redis_protocol::{resp3::types::Frame as Resp3Frame, types::RedisParseError}; use std::{str, sync::Arc}; -const EMPTY_SPACE: &'static str = " "; -const RIGHT_BRACKET: &'static str = "]"; -const QUOTE: &'static str = "\""; +const EMPTY_SPACE: &str = " "; +const RIGHT_BRACKET: &str = "]"; +const QUOTE: &str = "\""; fn to_f64(s: &str) -> Result> { s.parse::() diff --git a/src/monitor/utils.rs b/src/monitor/utils.rs index 038bcfe3..b295afdb 100644 --- a/src/monitor/utils.rs +++ b/src/monitor/utils.rs @@ -79,7 +79,7 @@ async fn send_monitor_command( _trace!(inner, "Recv MONITOR response: {:?}", frame); let response = protocol_utils::frame_to_results(frame)?; - let _ = protocol_utils::expect_ok(&response)?; + protocol_utils::expect_ok(&response)?; Ok(connection) } @@ -136,14 +136,14 @@ pub async fn start(config: RedisConfig) -> Result, R let inner = RedisClientInner::new(config, perf, connection, None); let mut connection = connection::create(&inner, &server, None).await?; - let _ = connection.setup(&inner, None).await?; + connection.setup(&inner, None).await?; let connection = send_monitor_command(&inner, connection).await?; // there isn't really a mechanism to surface backpressure to the server for the MONITOR stream, so we use a // background task with a channel to process the frames so that the server can keep sending data even if the // stream consumer slows down processing the frames. let (tx, rx) = unbounded_channel(); - let _ = tokio::spawn(async move { + tokio::spawn(async move { process_stream(&inner, tx, connection).await; }); diff --git a/src/protocol/cluster.rs b/src/protocol/cluster.rs index c7a93318..66129c6e 100644 --- a/src/protocol/cluster.rs +++ b/src/protocol/cluster.rs @@ -91,7 +91,7 @@ fn parse_cluster_slot_hostname(server: &[RedisValue], default_host: &Str) -> Res } /// Read the node block with format `|null, , , [metadata]` -fn parse_node_block(data: &Vec, default_host: &Str) -> Option<(Str, u16, Str, Str)> { +fn parse_node_block(data: &[RedisValue], default_host: &Str) -> Option<(Str, u16, Str, Str)> { if data.len() < 3 { return None; } @@ -199,7 +199,7 @@ pub fn parse_cluster_slots(frame: RedisValue, default_host: &Str) -> Result, default_host: &Str) { +fn replace_tls_server_names(policy: &TlsHostMapping, ranges: &mut [SlotRange], default_host: &Str) { for slot_range in ranges.iter_mut() { slot_range.primary.set_tls_server_name(policy, default_host); @@ -212,7 +212,7 @@ fn replace_tls_server_names(policy: &TlsHostMapping, ranges: &mut Vec /// Modify the `CLUSTER SLOTS` command according to the hostname mapping policy in the `TlsHostMapping`. #[cfg(any(feature = "enable-rustls", feature = "enable-native-tls"))] -pub fn modify_cluster_slot_hostnames(inner: &Arc, ranges: &mut Vec, default_host: &Str) { +pub fn modify_cluster_slot_hostnames(inner: &Arc, ranges: &mut [SlotRange], default_host: &Str) { let policy = match inner.config.tls { Some(ref config) => &config.hostnames, None => { diff --git a/src/protocol/command.rs b/src/protocol/command.rs index e29cf67e..58ac2e4c 100644 --- a/src/protocol/command.rs +++ b/src/protocol/command.rs @@ -1855,7 +1855,7 @@ impl RedisCommand { #[cfg(feature = "debug-ids")] counter: command_counter(), #[cfg(feature = "client-tracking")] - caching: self.caching.clone(), + caching: self.caching, } } diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index df525a09..8f3df24c 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -258,9 +258,9 @@ impl Sink for ConnectionKind { #[cfg(feature = "unix-sockets")] ConnectionKind::Unix(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e), #[cfg(feature = "enable-rustls")] - ConnectionKind::Rustls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e.into()), + ConnectionKind::Rustls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e), #[cfg(feature = "enable-native-tls")] - ConnectionKind::NativeTls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e.into()), + ConnectionKind::NativeTls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e), } } @@ -270,9 +270,9 @@ impl Sink for ConnectionKind { #[cfg(feature = "unix-sockets")] ConnectionKind::Unix(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e), #[cfg(feature = "enable-rustls")] - ConnectionKind::Rustls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e.into()), + ConnectionKind::Rustls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e), #[cfg(feature = "enable-native-tls")] - ConnectionKind::NativeTls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e.into()), + ConnectionKind::NativeTls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e), } } } @@ -358,9 +358,9 @@ impl Sink for SplitSinkKind { #[cfg(feature = "unix-sockets")] SplitSinkKind::Unix(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e), #[cfg(feature = "enable-rustls")] - SplitSinkKind::Rustls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e.into()), + SplitSinkKind::Rustls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e), #[cfg(feature = "enable-native-tls")] - SplitSinkKind::NativeTls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e.into()), + SplitSinkKind::NativeTls(ref mut conn) => Pin::new(conn).poll_flush(cx).map_err(|e| e), } } @@ -370,9 +370,9 @@ impl Sink for SplitSinkKind { #[cfg(feature = "unix-sockets")] SplitSinkKind::Unix(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e), #[cfg(feature = "enable-rustls")] - SplitSinkKind::Rustls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e.into()), + SplitSinkKind::Rustls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e), #[cfg(feature = "enable-native-tls")] - SplitSinkKind::NativeTls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e.into()), + SplitSinkKind::NativeTls(ref mut conn) => Pin::new(conn).poll_close(cx).map_err(|e| e), } } } @@ -500,18 +500,17 @@ impl RedisTransport { let (id, version) = (None, None); let tls_server_name = server .tls_server_name - .as_ref() - .map(|s| s.clone()) + .as_ref().cloned() .unwrap_or(server.host.clone()); let default_host = server.host.clone(); - let codec = RedisCodec::new(inner, &server); + let codec = RedisCodec::new(inner, server); let addrs = inner .get_resolver() .await .resolve(server.host.clone(), server.port) .await?; - let (socket, addr) = tcp_connect_any(inner, &server, &addrs).await?; + let (socket, addr) = tcp_connect_any(inner, server, &addrs).await?; _debug!(inner, "native-tls handshake with server name/host: {}", tls_server_name); let socket = connector.clone().connect(&tls_server_name, socket).await?; @@ -550,8 +549,7 @@ impl RedisTransport { let (id, version) = (None, None); let tls_server_name = server .tls_server_name - .as_ref() - .map(|s| s.clone()) + .as_ref().cloned() .unwrap_or(server.host.clone()); let default_host = server.host.clone(); @@ -561,7 +559,7 @@ impl RedisTransport { .await .resolve(server.host.clone(), server.port) .await?; - let (socket, addr) = tcp_connect_any(inner, &server, &addrs).await?; + let (socket, addr) = tcp_connect_any(inner, server, &addrs).await?; let server_name: ServerName = tls_server_name.deref().try_into()?; _debug!(inner, "rustls handshake with server name/host: {:?}", tls_server_name); @@ -1179,7 +1177,7 @@ pub async fn request_response( let (tx, rx) = oneshot_channel(); command.response = ResponseKind::Respond(Some(tx)); let timeout_dur = timeout - .or(command.timeout_dur.clone()) + .or(command.timeout_dur) .unwrap_or_else(|| inner.default_command_timeout()); _trace!( diff --git a/src/protocol/public.rs b/src/protocol/public.rs index c54751d5..856d326f 100644 --- a/src/protocol/public.rs +++ b/src/protocol/public.rs @@ -21,7 +21,7 @@ pub use redis_protocol::{ pub fn resp3_encode_command(cmd: &str) -> Resp3Frame { Resp3Frame::Array { data: cmd - .split(" ") + .split(' ') .map(|s| Resp3Frame::BlobString { data: s.as_bytes().to_vec().into(), attributes: None, @@ -35,7 +35,7 @@ pub fn resp3_encode_command(cmd: &str) -> Resp3Frame { pub fn resp2_encode_command(cmd: &str) -> Resp2Frame { Resp2Frame::Array( cmd - .split(" ") + .split(' ') .map(|s| Resp2Frame::BulkString(s.as_bytes().to_vec().into())) .collect(), ) diff --git a/src/protocol/responders.rs b/src/protocol/responders.rs index 4fe7b1fa..942ab48c 100644 --- a/src/protocol/responders.rs +++ b/src/protocol/responders.rs @@ -189,7 +189,7 @@ fn sample_command_latencies(inner: &Arc, command: &mut RedisCo if let Some(sent) = command.network_start.take() { sample_latency(&inner.network_latency_stats, sent); } - sample_latency(&inner.latency_stats, command.created.clone()); + sample_latency(&inner.latency_stats, command.created); } #[cfg(not(feature = "metrics"))] diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index dc8a9b60..b0936cff 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -62,7 +62,7 @@ impl TlsHostMapping { match self { TlsHostMapping::None => None, TlsHostMapping::DefaultHost => Some(default_host.to_owned()), - TlsHostMapping::Custom(ref inner) => inner.map(value, &default_host), + TlsHostMapping::Custom(ref inner) => inner.map(value, default_host), } } } @@ -70,18 +70,9 @@ impl TlsHostMapping { impl PartialEq for TlsHostMapping { fn eq(&self, other: &Self) -> bool { match self { - TlsHostMapping::None => match other { - TlsHostMapping::None => true, - _ => false, - }, - TlsHostMapping::DefaultHost => match other { - TlsHostMapping::DefaultHost => true, - _ => false, - }, - TlsHostMapping::Custom(_) => match other { - TlsHostMapping::Custom(_) => true, - _ => false, - }, + TlsHostMapping::None => matches!(other, TlsHostMapping::None), + TlsHostMapping::DefaultHost => matches!(other, TlsHostMapping::DefaultHost), + TlsHostMapping::Custom(_) => matches!(other, TlsHostMapping::Custom(_)), } } } @@ -171,7 +162,7 @@ impl TlsConnector { let system_certs = rustls_native_certs::load_native_certs()?; let mut cert_store = RootCertStore::empty(); for system_cert in system_certs.into_iter() { - let _ = cert_store.add(system_cert)?; + cert_store.add(system_cert)?; } Ok( @@ -191,7 +182,7 @@ impl TryFrom for TlsConnector { fn try_from(builder: NativeTlsConnectorBuilder) -> Result { let connector = builder .build() - .map(|t| TokioNativeTlsConnector::from(t)) + .map(TokioNativeTlsConnector::from) .map_err(|e| RedisError::new(RedisErrorKind::Tls, format!("{:?}", e)))?; Ok(TlsConnector::Native(connector)) } diff --git a/src/protocol/utils.rs b/src/protocol/utils.rs index 55623ea3..ace5fd44 100644 --- a/src/protocol/utils.rs +++ b/src/protocol/utils.rs @@ -66,7 +66,7 @@ pub fn server_to_parts(server: &str) -> Result<(&str, u16), RedisError> { Ok((parts[0], parts[1].parse::()?)) } -pub fn binary_search(slots: &Vec, slot: u16) -> Option { +pub fn binary_search(slots: &[SlotRange], slot: u16) -> Option { if slot > REDIS_CLUSTER_SLOTS { return None; } @@ -602,7 +602,7 @@ pub fn value_to_outgoing_resp3_frame(value: &RedisValue) -> Result Resp3Frame { match value { RedisValue::Array(values) => Resp3Frame::Array { - data: values.into_iter().map(|v| mocked_value_to_frame(v)).collect(), + data: values.into_iter().map(mocked_value_to_frame).collect(), attributes: None, }, RedisValue::Map(values) => Resp3Frame::Map { @@ -689,7 +689,7 @@ pub fn parse_master_role_replicas(data: RedisValue) -> Result, Redis } } -pub fn assert_array_len(data: &Vec, len: usize) -> Result<(), RedisError> { +pub fn assert_array_len(data: &[T], len: usize) -> Result<(), RedisError> { if data.len() == len { Ok(()) } else { @@ -762,7 +762,7 @@ pub fn value_to_zset_result(value: RedisValue) -> Result, #[cfg(any(feature = "blocking-encoding", feature = "partial-tracing", feature = "full-tracing"))] fn i64_size(i: i64) -> usize { if i < 0 { - 1 + redis_protocol::digits_in_number((i * -1) as usize) + 1 + redis_protocol::digits_in_number(-i as usize) } else { redis_protocol::digits_in_number(i as usize) } @@ -795,7 +795,7 @@ pub fn resp3_frame_size(frame: &Resp3Frame) -> usize { } #[cfg(any(feature = "blocking-encoding", feature = "partial-tracing", feature = "full-tracing"))] -pub fn args_size(args: &Vec) -> usize { +pub fn args_size(args: &[RedisValue]) -> usize { args.iter().fold(0, |c, arg| c + arg_size(arg)) } diff --git a/src/router/commands.rs b/src/router/commands.rs index d281ae01..38d9c931 100644 --- a/src/router/commands.rs +++ b/src/router/commands.rs @@ -98,181 +98,184 @@ async fn write_with_backpressure( command: RedisCommand, force_pipeline: bool, ) -> Result<(), RedisError> { - _trace!(inner, "Writing command: {:?}", command); - - let mut _command: Option = Some(command); - let mut _backpressure: Option = None; - loop { - let mut command = match _command.take() { - Some(command) => command, - None => return Err(RedisError::new(RedisErrorKind::Unknown, "Missing command.")), - }; - if let Err(e) = command.decr_check_attempted() { - command.finish(inner, Err(e)); - break; - } + Box::pin(async { + _trace!(inner, "Writing command: {:?}", command); + + let mut _command: Option = Some(command); + let mut _backpressure: Option = None; + loop { + let mut command = match _command.take() { + Some(command) => command, + None => return Err(RedisError::new(RedisErrorKind::Unknown, "Missing command.")), + }; + if let Err(e) = command.decr_check_attempted() { + command.finish(inner, Err(e)); + break; + } - // apply backpressure first if needed. as a part of that check we may decide to block on the next command. - let router_rx = match _backpressure { - Some(backpressure) => match backpressure.wait(inner, &mut command).await { - Ok(Some(rx)) => Some(rx), - Ok(None) => { + // apply backpressure first if needed. as a part of that check we may decide to block on the next command. + let router_rx = match _backpressure { + Some(backpressure) => match backpressure.wait(inner, &mut command).await { + Ok(Some(rx)) => Some(rx), + Ok(None) => { + if command.should_auto_pipeline(inner, force_pipeline) { + None + } else { + Some(command.create_router_channel()) + } + }, + Err(e) => { + command.respond_to_caller(Err(e)); + return Ok(()); + }, + }, + None => { if command.should_auto_pipeline(inner, force_pipeline) { None } else { Some(command.create_router_channel()) } }, - Err(e) => { - command.respond_to_caller(Err(e)); - return Ok(()); - }, - }, - None => { - if command.should_auto_pipeline(inner, force_pipeline) { - None - } else { - Some(command.create_router_channel()) - } - }, - }; - let closes_connection = command.kind.closes_connection(); - let is_blocking = command.blocks_connection(); - let use_replica = command.use_replica; + }; + let closes_connection = command.kind.closes_connection(); + let is_blocking = command.blocks_connection(); + let use_replica = command.use_replica; - let result = if use_replica { - router.write_replica(command, false).await - } else { - router.write(command, false).await - }; + let result = if use_replica { + router.write_replica(command, false).await + } else { + router.write(command, false).await + }; - match result { - Written::Backpressure((mut command, backpressure)) => { - _debug!(inner, "Recv backpressure again for {}.", command.kind.to_str_debug()); - // backpressure doesn't count as a write attempt - command.attempts_remaining += 1; - _command = Some(command); - _backpressure = Some(backpressure); + match result { + Written::Backpressure((mut command, backpressure)) => { + _debug!(inner, "Recv backpressure again for {}.", command.kind.to_str_debug()); + // backpressure doesn't count as a write attempt + command.attempts_remaining += 1; + _command = Some(command); + _backpressure = Some(backpressure); - continue; - }, - Written::Disconnected((server, command, error)) => { - _debug!(inner, "Handle disconnect for {:?} due to {:?}", server, error); - let commands = router.connections.disconnect(inner, server.as_ref()).await; - router.buffer_commands(commands); - if let Some(command) = command { - if command.should_finish_with_error(inner) { - command.finish(inner, Err(error)); - } else { - router.buffer_command(command); + continue; + }, + Written::Disconnected((server, command, error)) => { + _debug!(inner, "Handle disconnect for {:?} due to {:?}", server, error); + let commands = router.connections.disconnect(inner, server.as_ref()).await; + router.buffer_commands(commands); + if let Some(command) = command { + if command.should_finish_with_error(inner) { + command.finish(inner, Err(error)); + } else { + router.buffer_command(command); + } } - } - utils::defer_reconnect(inner); - break; - }, - Written::NotFound(mut command) => { - if let Err(e) = command.decr_check_redirections() { - command.finish(inner, Err(e)); utils::defer_reconnect(inner); break; - } - - _debug!(inner, "Perform cluster sync after missing hash slot lookup."); - if let Err(error) = router.sync_cluster().await { - // try to sync the cluster once, and failing that buffer the command. - _warn!(inner, "Failed to sync cluster after NotFound: {:?}", error); - utils::defer_reconnect(inner); - router.buffer_command(command); - utils::delay_cluster_sync(inner).await?; - break; - } else { - _command = Some(command); - _backpressure = None; - continue; - } - }, - Written::Ignore => { - _trace!(inner, "Ignore `Written` response."); - break; - }, - Written::SentAll => { - _trace!(inner, "Sent command to all servers."); - let _ = router.check_and_flush().await; - if let Some(command) = handle_router_response(inner, router, router_rx).await? { - // commands that are sent to all nodes are not retried after a connection closing - _warn!(inner, "Responding with canceled error after all nodes command failure."); - command.finish(inner, Err(RedisError::new_canceled())); - break; - } else { - if closes_connection { - _trace!(inner, "Ending command loop after QUIT or SHUTDOWN."); - return Err(RedisError::new_canceled()); + }, + Written::NotFound(mut command) => { + if let Err(e) = command.decr_check_redirections() { + command.finish(inner, Err(e)); + utils::defer_reconnect(inner); + break; } + _debug!(inner, "Perform cluster sync after missing hash slot lookup."); + if let Err(error) = router.sync_cluster().await { + // try to sync the cluster once, and failing that buffer the command. + _warn!(inner, "Failed to sync cluster after NotFound: {:?}", error); + utils::defer_reconnect(inner); + router.buffer_command(command); + utils::delay_cluster_sync(inner).await?; + break; + } else { + _command = Some(command); + _backpressure = None; + continue; + } + }, + Written::Ignore => { + _trace!(inner, "Ignore `Written` response."); break; - } - }, - Written::Sent((server, flushed)) => { - _trace!(inner, "Sent command to {}. Flushed: {}", server, flushed); - if is_blocking { - inner.backchannel.write().await.set_blocked(&server); - } - if !flushed { + }, + Written::SentAll => { + _trace!(inner, "Sent command to all servers."); let _ = router.check_and_flush().await; - } + if let Some(command) = handle_router_response(inner, router, router_rx).await? { + // commands that are sent to all nodes are not retried after a connection closing + _warn!(inner, "Responding with canceled error after all nodes command failure."); + command.finish(inner, Err(RedisError::new_canceled())); + break; + } else { + if closes_connection { + _trace!(inner, "Ending command loop after QUIT or SHUTDOWN."); + return Err(RedisError::new_canceled()); + } - let should_interrupt = - is_blocking && inner.counters.read_cmd_buffer_len() > 0 && inner.config.blocking == Blocking::Interrupt; - if should_interrupt { - // if there's other commands in the queue then interrupt the command that was just sent - _debug!(inner, "Interrupt after write."); - if let Err(e) = client_utils::interrupt_blocked_connection(inner, ClientUnblockFlag::Error).await { - _warn!(inner, "Failed to unblock connection: {:?}", e); + break; + } + }, + Written::Sent((server, flushed)) => { + _trace!(inner, "Sent command to {}. Flushed: {}", server, flushed); + if is_blocking { + inner.backchannel.write().await.set_blocked(&server); + } + if !flushed { + let _ = router.check_and_flush().await; } - } - if let Some(command) = handle_router_response(inner, router, router_rx).await? { - _command = Some(command); - _backpressure = None; - continue; - } else { - if closes_connection { - _trace!(inner, "Ending command loop after QUIT or SHUTDOWN."); - return Err(RedisError::new_canceled()); + let should_interrupt = + is_blocking && inner.counters.read_cmd_buffer_len() > 0 && inner.config.blocking == Blocking::Interrupt; + if should_interrupt { + // if there's other commands in the queue then interrupt the command that was just sent + _debug!(inner, "Interrupt after write."); + if let Err(e) = client_utils::interrupt_blocked_connection(inner, ClientUnblockFlag::Error).await { + _warn!(inner, "Failed to unblock connection: {:?}", e); + } } - break; - } - }, - Written::Error((error, command)) => { - _debug!(inner, "Fatal error writing command: {:?}", error); - if let Some(command) = command { - command.finish(inner, Err(error.clone())); - } - inner.notifications.broadcast_error(error.clone()); + if let Some(command) = handle_router_response(inner, router, router_rx).await? { + _command = Some(command); + _backpressure = None; + continue; + } else { + if closes_connection { + _trace!(inner, "Ending command loop after QUIT or SHUTDOWN."); + return Err(RedisError::new_canceled()); + } - utils::defer_reconnect(inner); - return Err(error); - }, - #[cfg(feature = "replicas")] - Written::Fallback(command) => { - _error!( - inner, - "Unexpected replica response to {} ({})", - command.kind.to_str_debug(), - command.debug_id() - ); - command.finish( - inner, - Err(RedisError::new(RedisErrorKind::Replica, "Unexpected replica response.")), - ); - break; - }, + break; + } + }, + Written::Error((error, command)) => { + _debug!(inner, "Fatal error writing command: {:?}", error); + if let Some(command) = command { + command.finish(inner, Err(error.clone())); + } + inner.notifications.broadcast_error(error.clone()); + + utils::defer_reconnect(inner); + return Err(error); + }, + #[cfg(feature = "replicas")] + Written::Fallback(command) => { + _error!( + inner, + "Unexpected replica response to {} ({})", + command.kind.to_str_debug(), + command.debug_id() + ); + command.finish( + inner, + Err(RedisError::new(RedisErrorKind::Replica, "Unexpected replica response.")), + ); + break; + }, + } } - } - Ok(()) + Ok(()) + }) + .await } #[cfg(feature = "full-tracing")] @@ -591,7 +594,7 @@ pub async fn start(inner: &Arc) -> Result<(), RedisError> { let mut router = Router::new(inner); _debug!(inner, "Initializing router with policy: {:?}", inner.reconnect_policy()); let result = if inner.config.fail_fast { - if let Err(e) = router.connect().await { + if let Err(e) = Box::pin(router.connect()).await { inner.notifications.broadcast_connect(Err(e.clone())); inner.notifications.broadcast_error(e.clone()); Err(e) @@ -608,7 +611,7 @@ pub async fn start(inner: &Arc) -> Result<(), RedisError> { inner.store_command_rx(rx, false); Err(error) } else { - let result = process_commands(inner, &mut router, &mut rx).await; + let result = Box::pin(process_commands(inner, &mut router, &mut rx)).await; inner.store_command_rx(rx, false); result } @@ -640,11 +643,9 @@ mod mocking { RouterCommand::Pipeline { commands } => { for mut command in commands.into_iter() { let mocked = command.to_mocked(); - let result = mocks - .process_command(mocked) - .map(|result| protocol_utils::mocked_value_to_frame(result)); + let result = mocks.process_command(mocked).map(protocol_utils::mocked_value_to_frame); - let _ = command.respond_to_caller(result); + command.respond_to_caller(result); } Ok(()) @@ -652,8 +653,8 @@ mod mocking { RouterCommand::Command(mut command) => { let result = mocks .process_command(command.to_mocked()) - .map(|result| protocol_utils::mocked_value_to_frame(result)); - let _ = command.respond_to_caller(result); + .map(protocol_utils::mocked_value_to_frame); + command.respond_to_caller(result); Ok(()) }, diff --git a/src/router/mod.rs b/src/router/mod.rs index 72813dae..40740cf6 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -181,7 +181,7 @@ impl Connections { let replicas = inner .with_cluster_state(|state| Ok(state.replicas(primary))) .ok() - .unwrap_or(Vec::new()); + .unwrap_or_default(); for replica in replicas.into_iter() { out.insert(replica, primary.clone()); @@ -242,11 +242,11 @@ impl Connections { buffer: &mut VecDeque, ) -> Result<(), RedisError> { let result = if inner.config.server.is_clustered() { - clustered::initialize_connections(inner, self, buffer).await + Box::pin(clustered::initialize_connections(inner, self, buffer)).await } else if inner.config.server.is_centralized() || inner.config.server.is_unix_socket() { - centralized::initialize_connection(inner, self, buffer).await + Box::pin(centralized::initialize_connection(inner, self, buffer)).await } else if inner.config.server.is_sentinel() { - sentinel::initialize_connection(inner, self, buffer).await + Box::pin(sentinel::initialize_connection(inner, self, buffer)).await } else { return Err(RedisError::new(RedisErrorKind::Config, "Invalid client configuration.")); }; @@ -771,8 +771,8 @@ impl Router { for server in remove.into_iter() { debug!("{}: Dropping replica connection to {}", self.inner.id, server); - self.replicas.drop_writer(&server).await; - self.replicas.remove_replica(&server); + self.replicas.drop_writer(server).await; + self.replicas.remove_replica(server); } for (mut replica, primary) in new_replica_map.into_iter() { @@ -785,7 +785,7 @@ impl Router { if should_use { replicas::map_replica_tls_names(&self.inner, &primary, &mut replica); - let _ = self + self .replicas .add_connection(&self.inner, primary, replica, false) .await?; diff --git a/src/router/replicas.rs b/src/router/replicas.rs index 915073c3..889c9d0c 100644 --- a/src/router/replicas.rs +++ b/src/router/replicas.rs @@ -15,7 +15,6 @@ use crate::{ #[cfg(feature = "replicas")] use std::{ collections::{HashMap, VecDeque}, - convert::identity, fmt, fmt::Formatter, sync::Arc, @@ -160,11 +159,7 @@ impl ReplicaSet { /// Add a replica node to the routing table. pub fn add(&mut self, primary: Server, replica: Server) { - self - .servers - .entry(primary) - .or_insert(ReplicaRouter::default()) - .add(replica); + self.servers.entry(primary).or_default().add(replica); } /// Remove a replica node mapping from the routing table. @@ -210,7 +205,7 @@ impl ReplicaSet { .get(primary) .map(|router| router.iter()) .into_iter() - .flat_map(identity) + .flatten() } /// Return a map of replica nodes to primary nodes. @@ -270,7 +265,7 @@ impl Replicas { } for (replica, primary) in self.routing.to_map() { - let _ = self.add_connection(inner, primary, replica, false).await?; + self.add_connection(inner, primary, replica, false).await?; } Ok(()) @@ -304,10 +299,10 @@ impl Replicas { if !inner.connection.replica.lazy_connections || force { let mut transport = connection::create(inner, &replica, None).await?; - let _ = transport.setup(inner, None).await?; + transport.setup(inner, None).await?; let (_, writer) = if inner.config.server.is_clustered() { - let _ = transport.readonly(inner, None).await?; + transport.readonly(inner, None).await?; connection::split_and_initialize(inner, transport, true, clustered::spawn_reader_task)? } else { connection::split_and_initialize(inner, transport, true, centralized::spawn_reader_task)? @@ -358,7 +353,7 @@ impl Replicas { /// Check and flush all the sockets managed by the replica routing state. pub async fn check_and_flush(&mut self) -> Result<(), RedisError> { for (_, writer) in self.writers.iter_mut() { - let _ = writer.flush().await?; + writer.flush().await?; } Ok(()) diff --git a/src/router/responses.rs b/src/router/responses.rs index d0f1c70a..485c6048 100644 --- a/src/router/responses.rs +++ b/src/router/responses.rs @@ -15,7 +15,7 @@ use crate::types::Invalidation; const KEYSPACE_PREFIX: &str = "__keyspace@"; const KEYEVENT_PREFIX: &str = "__keyevent@"; #[cfg(feature = "client-tracking")] -const INVALIDATION_CHANNEL: &'static str = "__redis__:invalidate"; +const INVALIDATION_CHANNEL: &str = "__redis__:invalidate"; fn parse_keyspace_notification(channel: &str, message: &RedisValue) -> Option { if channel.starts_with(KEYEVENT_PREFIX) { @@ -246,7 +246,7 @@ pub fn check_pubsub_message(inner: &Arc, server: &Server, fram }, }; if let Some(ref span) = span { - span.record("channel", &&*message.channel); + span.record("channel", &*message.channel); } if is_pubsub_invalidation(&message) { diff --git a/src/router/sentinel.rs b/src/router/sentinel.rs index e23c1e45..d044e756 100644 --- a/src/router/sentinel.rs +++ b/src/router/sentinel.rs @@ -324,11 +324,11 @@ pub async fn initialize_connection( let server = transport.server.clone(); utils::apply_timeout( - async { + Box::pin(async { check_primary_node_role(inner, &mut transport).await?; update_cached_client_state(inner, writer, sentinel, transport).await?; Ok::<_, RedisError>(()) - }, + }), inner.internal_command_timeout(), ) .await?; diff --git a/src/router/utils.rs b/src/router/utils.rs index 88af461f..73758294 100644 --- a/src/router/utils.rs +++ b/src/router/utils.rs @@ -240,7 +240,7 @@ pub fn next_reconnection_delay(inner: &Arc) -> Result, router: &mut Router) -> Result<(), RedisError> { client_utils::set_client_state(&inner.state, ClientState::Connecting); - if let Err(e) = router.connect().await { + if let Err(e) = Box::pin(router.connect()).await { _debug!(inner, "Failed reconnecting with error: {:?}", e); client_utils::set_client_state(&inner.state, ClientState::Disconnected); inner.notifications.broadcast_error(e.clone()); diff --git a/src/trace/enabled.rs b/src/trace/enabled.rs index 37cc68f9..40dfeada 100644 --- a/src/trace/enabled.rs +++ b/src/trace/enabled.rs @@ -48,12 +48,12 @@ impl fmt::Debug for CommandTraces { pub fn set_network_span(inner: &Arc, command: &mut RedisCommand, flush: bool) { trace!("Setting network span from command {}", command.debug_id()); let span = fspan!(command, inner.tracing_span_level(), "wait_for_response", flush); - let _ = span.in_scope(|| {}); + span.in_scope(|| {}); command.traces.network = Some(span); } pub fn record_response_size(span: &Span, frame: &Frame) { - span.record("res_size", &protocol_utils::resp3_frame_size(frame)); + span.record("res_size", protocol_utils::resp3_frame_size(frame)); } pub fn create_command_span(inner: &Arc) -> Span { diff --git a/src/types/client.rs b/src/types/client.rs index c0186373..bb260a7f 100644 --- a/src/types/client.rs +++ b/src/types/client.rs @@ -164,7 +164,7 @@ impl TryFrom<&String> for Toggle { type Error = RedisError; fn try_from(value: &String) -> Result { - Toggle::from_str(&value).ok_or(RedisError::new(RedisErrorKind::Parse, "Invalid toggle value.")) + Toggle::from_str(value).ok_or(RedisError::new(RedisErrorKind::Parse, "Invalid toggle value.")) } } diff --git a/src/types/config.rs b/src/types/config.rs index 876c5ac7..a4eb207c 100644 --- a/src/types/config.rs +++ b/src/types/config.rs @@ -653,51 +653,45 @@ impl Default for RedisConfig { } impl RedisConfig { - /// Whether or not the client uses TLS. + /// Whether the client uses TLS. #[cfg(any(feature = "enable-native-tls", feature = "enable-rustls"))] pub fn uses_tls(&self) -> bool { self.tls.is_some() } - /// Whether or not the client uses TLS. + /// Whether the client uses TLS. #[cfg(not(any(feature = "enable-native-tls", feature = "enable-rustls")))] pub fn uses_tls(&self) -> bool { false } - /// Whether or not the client uses a `native-tls` connector. + /// Whether the client uses a `native-tls` connector. #[cfg(feature = "enable-native-tls")] - #[allow(unreachable_patterns)] pub fn uses_native_tls(&self) -> bool { - match self.tls { - Some(ref config) => match config.connector { - TlsConnector::Native(_) => true, - _ => false, - }, - None => false, - } + self + .tls + .as_ref() + .map(|config| matches!(config.connector, TlsConnector::Native(_))) + .unwrap_or(false) } - /// Whether or not the client uses a `native-tls` connector. + /// Whether the client uses a `native-tls` connector. #[cfg(not(feature = "enable-native-tls"))] pub fn uses_native_tls(&self) -> bool { false } - /// Whether or not the client uses a `rustls` connector. + /// Whether the client uses a `rustls` connector. #[cfg(feature = "enable-rustls")] - #[allow(unreachable_patterns)] pub fn uses_rustls(&self) -> bool { - match self.tls { - Some(ref config) => match config.connector { - TlsConnector::Rustls(_) => true, - _ => false, - }, - None => false, - } + self + .tls + .as_ref() + .map(|config| matches!(config.connector, TlsConnector::Rustls(_))) + .unwrap_or(false) } - /// Whether or not the client uses a `rustls` connector. + /// Whether the client uses a `rustls` connector. #[cfg(not(feature = "enable-rustls"))] pub fn uses_rustls(&self) -> bool { false @@ -1275,7 +1269,7 @@ impl Options { cluster_node: cmd.cluster_node.clone(), fail_fast: cmd.fail_fast, #[cfg(feature = "client-tracking")] - caching: cmd.caching.clone(), + caching: cmd.caching, } } @@ -1288,7 +1282,7 @@ impl Options { #[cfg(feature = "client-tracking")] { - command.caching = self.caching.clone(); + command.caching = self.caching; } if let Some(attempts) = self.max_attempts { diff --git a/src/types/timeseries.rs b/src/types/timeseries.rs index ac3b9b72..5cb53ecc 100644 --- a/src/types/timeseries.rs +++ b/src/types/timeseries.rs @@ -73,7 +73,7 @@ impl Timestamp { } pub(crate) fn from_str(value: &str) -> Result { - match value.as_ref() { + match value { "*" => Ok(Timestamp::Now), _ => Ok(Timestamp::Custom(value.parse::()?)), } @@ -218,7 +218,7 @@ impl TryFrom<&str> for GetTimestamp { type Error = RedisError; fn try_from(value: &str) -> Result { - Ok(match value.as_ref() { + Ok(match value { "-" => GetTimestamp::Earliest, "+" => GetTimestamp::Latest, _ => GetTimestamp::Custom(value.parse::()?), @@ -319,7 +319,7 @@ impl TryFrom<&str> for BucketTimestamp { type Error = RedisError; fn try_from(value: &str) -> Result { - Ok(match value.as_ref() { + Ok(match value { "-" | "start" => BucketTimestamp::Start, "+" | "end" => BucketTimestamp::End, "~" | "mid" => BucketTimestamp::Mid, diff --git a/src/utils.rs b/src/utils.rs index 9e1e405c..a704b9d1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -62,9 +62,9 @@ const REDIS_SENTINEL_SCHEME_SUFFIX: &str = "-sentinel"; const SENTINEL_NAME_QUERY: &str = "sentinelServiceName"; const CLUSTER_NODE_QUERY: &str = "node"; #[cfg(feature = "sentinel-auth")] -const SENTINEL_USERNAME_QUERY: &'static str = "sentinelUsername"; +const SENTINEL_USERNAME_QUERY: &str = "sentinelUsername"; #[cfg(feature = "sentinel-auth")] -const SENTINEL_PASSWORD_QUERY: &'static str = "sentinelPassword"; +const SENTINEL_PASSWORD_QUERY: &str = "sentinelPassword"; /// Create a `Str` from a static str slice without copying. pub fn static_str(s: &'static str) -> Str { @@ -464,12 +464,12 @@ where let mut command: RedisCommand = func()?.into(); command.response = ResponseKind::Respond(Some(tx)); - let req_size = protocol_utils::args_size(&command.args()); - args_span.record("num_args", &command.args().len()); + let req_size = protocol_utils::args_size(command.args()); + args_span.record("num_args", command.args().len()); (command, rx, req_size) }; - cmd_span.record("cmd", &command.kind.to_str_debug()); - cmd_span.record("req_size", &req_size); + cmd_span.record("cmd", command.kind.to_str_debug()); + cmd_span.record("req_size", req_size); let queued_span = trace::create_queued_span(cmd_span.id(), inner); let timed_out = command.timed_out.clone(); @@ -484,8 +484,8 @@ where command.traces.queued = Some(queued_span); let timeout_dur = prepare_command(client, &mut command); - let _ = check_blocking_policy(inner, &command).await?; - let _ = client.send_command(command)?; + check_blocking_policy(inner, &command).await?; + client.send_command(command)?; apply_timeout(rx, timeout_dur) .and_then(|r| async { r }) @@ -655,7 +655,7 @@ pub fn flatten_nested_array_values(value: RedisValue, depth: usize) -> RedisValu } } -pub fn is_maybe_array_map(arr: &Vec) -> bool { +pub fn is_maybe_array_map(arr: &[RedisValue]) -> bool { if !arr.is_empty() && arr.len() % 2 == 0 { arr.chunks(2).all(|chunk| !chunk[0].is_aggregate_type()) } else { diff --git a/tests/integration/other/mod.rs b/tests/integration/other/mod.rs index 4cb8bfae..1c03aee1 100644 --- a/tests/integration/other/mod.rs +++ b/tests/integration/other/mod.rs @@ -579,13 +579,13 @@ pub async fn should_support_options_with_trx(client: RedisClient, _: RedisConfig }; let trx = client.multi().with_options(&options); - let _: () = trx.get("foo{1}").await?; - let _: () = trx.set("foo{1}", "bar", None, None, false).await?; - let _: () = trx.get("foo{1}").await?; + trx.get("foo{1}").await?; + trx.set("foo{1}", "bar", None, None, false).await?; + trx.get("foo{1}").await?; let (first, second, third): (Option, bool, String) = trx.exec(true).await?; assert_eq!(first, None); - assert_eq!(second, true); + assert!(second); assert_eq!(third, "bar"); Ok(()) } diff --git a/tests/integration/tracking/mod.rs b/tests/integration/tracking/mod.rs index a0c0a610..9767ad36 100644 --- a/tests/integration/tracking/mod.rs +++ b/tests/integration/tracking/mod.rs @@ -11,6 +11,7 @@ use std::{ }; use tokio::time::sleep; +#[allow(dead_code)] pub async fn should_invalidate_foo_resp3(client: RedisClient, _: RedisConfig) -> Result<(), RedisError> { if client.protocol_version() == RespVersion::RESP2 { return Ok(()); @@ -31,13 +32,13 @@ pub async fn should_invalidate_foo_resp3(client: RedisClient, _: RedisConfig) -> } }); - let _ = client.start_tracking(None, false, false, false, false).await?; - let _: () = client.get("foo{1}").await?; - let _: () = client.incr("foo{1}").await?; + client.start_tracking(None, false, false, false, false).await?; + client.get("foo{1}").await?; + client.incr("foo{1}").await?; - let _: () = client.mget(vec!["bar{1}", "baz{1}"]).await?; - let _: () = client.mset(vec![("bar{1}", 1), ("baz{1}", 1)]).await?; - let _ = client.flushall(false).await?; + client.mget(vec!["bar{1}", "baz{1}"]).await?; + client.mset(vec![("bar{1}", 1), ("baz{1}", 1)]).await?; + client.flushall(false).await?; sleep(Duration::from_secs(1)).await; if invalidated.load(Ordering::Acquire) { @@ -47,6 +48,7 @@ pub async fn should_invalidate_foo_resp3(client: RedisClient, _: RedisConfig) -> } } +#[allow(dead_code)] pub async fn should_invalidate_foo_resp2_centralized(client: RedisClient, _: RedisConfig) -> Result<(), RedisError> { if client.protocol_version() == RespVersion::RESP3 || client.is_clustered() { return Ok(()); @@ -55,8 +57,8 @@ pub async fn should_invalidate_foo_resp2_centralized(client: RedisClient, _: Red let key: RedisKey = "foo{1}".into(); check_null!(client, "foo{1}"); let subscriber = client.clone_new(); - let _ = subscriber.connect(); - let _ = subscriber.wait_for_connect().await?; + subscriber.connect(); + subscriber.wait_for_connect().await?; let invalidated = Arc::new(AtomicBool::new(false)); let _invalidated = invalidated.clone(); @@ -69,7 +71,7 @@ pub async fn should_invalidate_foo_resp2_centralized(client: RedisClient, _: Red } } }); - let _ = subscriber.subscribe("__redis__:invalidate").await?; + subscriber.subscribe("__redis__:invalidate").await?; let (_, subscriber_id) = subscriber .connection_ids() @@ -78,7 +80,7 @@ pub async fn should_invalidate_foo_resp2_centralized(client: RedisClient, _: Red .next() .expect("Failed to read subscriber connection ID"); - let _ = client + client .client_tracking("on", Some(subscriber_id), None, false, false, false, false) .await?; @@ -86,14 +88,14 @@ pub async fn should_invalidate_foo_resp2_centralized(client: RedisClient, _: Red // in resp2 this might take some changes to the pubsub parser if it doesn't work with an array as the message type // check pubsub messages with one key - let _: () = client.get("foo{1}").await?; - let _: () = client.incr("foo{1}").await?; + client.get("foo{1}").await?; + client.incr("foo{1}").await?; // check pubsub messages with an array of keys - let _: () = client.mget(vec!["bar{1}", "baz{1}"]).await?; - let _: () = client.mset(vec![("bar{1}", 1), ("baz{1}", 1)]).await?; + client.mget(vec!["bar{1}", "baz{1}"]).await?; + client.mset(vec![("bar{1}", 1), ("baz{1}", 1)]).await?; // check pubsub messages with a null key - let _ = client.flushall(false).await?; + client.flushall(false).await?; sleep(Duration::from_secs(1)).await; if invalidated.load(Ordering::Acquire) {