Skip to content

Commit

Permalink
8.0.3 (#218)
Browse files Browse the repository at this point in the history
* chore: box pin large futures
  • Loading branch information
aembke authored Feb 20, 2024
1 parent 74b5721 commit b070601
Show file tree
Hide file tree
Showing 41 changed files with 341 additions and 366 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ jobs:
- checkout
- run:
name: Clippy
command: cargo clippy -- -Dwarnings
command: cargo clippy --all-features --lib -p fred -- -Dwarnings


workflows:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 8.0.3

* Box large futures to reduce stack usage.

## 8.0.2

* Fix cluster replica failover at high concurrency.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "8.0.2"
version = "8.0.3"
authors = ["Alec Embke <[email protected]>"]
edition = "2021"
description = "An async Redis client built on Tokio."
Expand Down
4 changes: 1 addition & 3 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async fn main() -> Result<(), RedisError> {
// see the `Builder` interface for more information
let client = Builder::from_config(config).build()?;
// callers can manage the tokio task driving the connections
let connection_task = client.init().await?;
let _connection_task = client.init().await?;
// convert response types to most common rust types
let foo: Option<String> = client.get("foo").await?;
println!("Foo: {:?}", foo);
Expand All @@ -23,7 +23,5 @@ async fn main() -> Result<(), RedisError> {
println!("Foo: {:?}", client.get::<Option<String>, _>("foo").await?);

client.quit().await?;
// calling quit ends the connection and event listener tasks
let _ = connection_task.await;
Ok(())
}
17 changes: 8 additions & 9 deletions examples/client_tracking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![allow(clippy::disallowed_names)]
#![allow(clippy::let_underscore_future)]

use fred::{interfaces::TrackingInterface, prelude::*, types::RespVersion};

Expand All @@ -16,14 +15,14 @@ async fn resp3_tracking_interface_example() -> Result<(), RedisError> {
client.init().await?;

// spawn a task that processes invalidation messages.
let _ = client.on_invalidation(|invalidation| {
let _invalidate_task = client.on_invalidation(|invalidation| {
println!("{}: Invalidate {:?}", invalidation.server, invalidation.keys);
Ok(())
});

// enable client tracking on all connections. it's usually a good idea to do this in an `on_reconnect` block.
client.start_tracking(None, false, false, false, false).await?;
let _: () = client.get("foo").await?;
client.get("foo").await?;

// send `CLIENT CACHING yes|no` before subsequent commands. the preceding `CLIENT CACHING yes|no` command will be
// sent when the command is retried as well.
Expand Down Expand Up @@ -58,14 +57,14 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> {
// the invalidation subscriber interface is the same as above even in RESP2 mode **as long as the `client-tracking`
// feature is enabled**. if the feature is disabled then the message will appear on the `on_message` receiver.
let mut invalidations = subscriber.invalidation_rx();
tokio::spawn(async move {
let _invalidate_task = tokio::spawn(async move {
while let Ok(invalidation) = invalidations.recv().await {
println!("{}: Invalidate {:?}", invalidation.server, invalidation.keys);
}
});
// in RESP2 mode we must manually subscribe to the invalidation channel. the `start_tracking` function does this
// automatically with the RESP3 interface.
let _: () = subscriber.subscribe("__redis__:invalidate").await?;
subscriber.subscribe("__redis__:invalidate").await?;

// enable client tracking, sending invalidation messages to the subscriber client
let (_, connection_id) = subscriber
Expand All @@ -74,7 +73,7 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> {
.into_iter()
.next()
.expect("Failed to read subscriber connection ID");
let _ = client
client
.client_tracking("on", Some(connection_id), None, false, false, false, false)
.await?;

Expand All @@ -83,8 +82,8 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> {

let pipeline = client.pipeline();
// it's recommended to pipeline `CLIENT CACHING yes|no` if the client is used across multiple tasks
let _: () = pipeline.client_caching(true).await?;
let _: () = pipeline.incr("foo").await?;
pipeline.client_caching(true).await?;
pipeline.incr("foo").await?;
println!("Foo: {}", pipeline.last::<i64>().await?);

Ok(())
Expand All @@ -94,7 +93,7 @@ async fn resp2_basic_interface_example() -> Result<(), RedisError> {
// see https://redis.io/docs/manual/client-side-caching/
async fn main() -> Result<(), RedisError> {
resp3_tracking_interface_example().await?;
// resp2_basic_interface_example().await?;
resp2_basic_interface_example().await?;

Ok(())
}
17 changes: 5 additions & 12 deletions examples/custom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,8 @@ async fn main() -> Result<(), RedisError> {
client.init().await?;
client.lpush("foo", vec![1, 2, 3]).await?;

// some types require TryInto
let args: Vec<RedisValue> = vec!["foo".into(), 0.into(), 3_u64.try_into()?];
// returns a frame (https://docs.rs/redis-protocol/latest/redis_protocol/resp3/types/enum.Frame.html)
let frame = client.custom_raw(cmd!("LRANGE"), args.clone()).await?;
// or convert back to client types
let value: RedisValue = frame.try_into()?;
// and/or use the type conversion shorthand
let value: Vec<String> = value.convert()?;
println!("LRANGE Values: {:?}", value);
let result: Vec<String> = client.custom(cmd!("LRANGE"), vec!["foo", "0", "3"]).await?;
println!("LRANGE Values: {:?}", result);

// or customize routing and blocking parameters
let _ = cmd!("LRANGE", blocking: false);
Expand All @@ -31,9 +24,9 @@ async fn main() -> Result<(), RedisError> {
// which is shorthand for
let command = CustomCommand::new("LRANGE", ClusterHash::FirstKey, false);

// convert to `FromRedis` types
let _: Vec<i64> = client
.custom_raw(command, args)
// or use `custom_raw` to operate on RESP3 frames
let _result: Vec<i64> = client
.custom_raw(command, vec!["foo", "0", "3"])
.await
.and_then(|frame| frame.try_into())
.and_then(|value: RedisValue| value.convert())?;
Expand Down
6 changes: 3 additions & 3 deletions examples/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use trust_dns_resolver::{

pub struct TrustDnsResolver(TokioAsyncResolver);

impl TrustDnsResolver {
fn new() -> Self {
impl Default for TrustDnsResolver {
fn default() -> Self {
TrustDnsResolver(TokioAsyncResolver::tokio(
ResolverConfig::default(),
ResolverOpts::default(),
Expand All @@ -39,7 +39,7 @@ impl Resolve for TrustDnsResolver {
#[tokio::main]
async fn main() -> Result<(), RedisError> {
let client = Builder::default_centralized().build()?;
client.set_resolver(Arc::new(TrustDnsResolver::new())).await;
client.set_resolver(Arc::new(TrustDnsResolver::default())).await;
client.init().await?;

// ...
Expand Down
34 changes: 17 additions & 17 deletions examples/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,6 @@ use fred::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

async fn fake_traffic(client: &RedisClient, amount: usize) -> Result<(), RedisError> {
// use a new client since the provided client is subscribed to keyspace events
let client = client.clone_new();
client.init().await?;

for idx in 0 .. amount {
let key: RedisKey = format!("foo-{}", idx).into();

client.set(&key, 1, None, None, false).await?;
client.incr(&key).await?;
client.del(&key).await?;
}

client.quit().await?;
Ok(())
}

/// Examples showing how to set up keyspace notifications with clustered or centralized/sentinel deployments.
///
/// The most complicated part of this process involves safely handling reconnections. Keyspace events rely on the
Expand All @@ -40,6 +23,23 @@ async fn main() -> Result<(), RedisError> {
Ok(())
}

async fn fake_traffic(client: &RedisClient, amount: usize) -> Result<(), RedisError> {
// use a new client since the provided client is subscribed to keyspace events
let client = client.clone_new();
client.init().await?;

for idx in 0 .. amount {
let key: RedisKey = format!("foo-{}", idx).into();

client.set(&key, 1, None, None, false).await?;
client.incr(&key).await?;
client.del(&key).await?;
}

client.quit().await?;
Ok(())
}

async fn centralized_keyspace_events() -> Result<(), RedisError> {
let subscriber = Builder::default_centralized().build()?;

Expand Down
2 changes: 1 addition & 1 deletion examples/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn scripts() -> Result<(), RedisError> {
client.init().await?;

let script = Script::from_lua(SCRIPTS[0]);
let _ = script.load(&client).await?;
script.load(&client).await?;
let result = script.evalsha(&client, vec!["foo", "bar"], vec![1, 2]).await?;
println!("First script result: {:?}", result);

Expand Down
7 changes: 4 additions & 3 deletions examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ async fn main() -> Result<(), RedisError> {

// all client types, including `RedisPool`, implement the same command interface traits so callers can often use
// them interchangeably. in this example each command below will be sent round-robin to the underlying 5 clients.
pool.get("foo").await?;
assert!(pool.get::<Option<String>, _>("foo").await?.is_none());
pool.set("foo", "bar", None, None, false).await?;
pool.get("foo").await?;
assert_eq!(pool.get::<String, _>("foo").await?, "bar");

pool.del("foo").await?;
// interact with specific clients via next(), last(), or clients()
let pipeline = pool.next().pipeline();
pipeline.incr("foo").await?;
pipeline.incr("foo").await?;
let _: i64 = pipeline.last().await?;
assert_eq!(pipeline.last::<i64>().await?, 2);

for client in pool.clients() {
println!("{} connected to {:?}", client.id(), client.active_connections().await?);
Expand Down
4 changes: 2 additions & 2 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn main() -> Result<(), RedisError> {
});

for idx in 0 .. 50 {
let _ = publisher_client.publish("foo", idx).await?;
publisher_client.publish("foo", idx).await?;
sleep(Duration::from_secs(1)).await;
}

Expand Down Expand Up @@ -52,7 +52,7 @@ async fn subscriber_example() -> Result<(), RedisError> {
subscriber.subscribe("foo").await?;
subscriber.psubscribe(vec!["bar*", "baz*"]).await?;
subscriber.ssubscribe("abc{123}").await?;
// upon reconnecting the client will automatically re-subscribe to the above channels and patterns
// after reconnecting the client will automatically re-subscribe to the above channels and patterns
println!("Subscriber channels: {:?}", subscriber.tracked_channels()); // "foo"
println!("Subscriber patterns: {:?}", subscriber.tracked_patterns()); // "bar*", "baz*"
println!("Subscriber shard channels: {:?}", subscriber.tracked_shard_channels()); // "abc{123}"
Expand Down
10 changes: 5 additions & 5 deletions src/clients/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,21 +338,21 @@ impl SubscriberClient {

/// Unsubscribe from all tracked channels and patterns, and remove them from the client cache.
pub async fn unsubscribe_all(&self) -> Result<(), RedisError> {
let channels: Vec<RedisKey> = mem::replace(&mut *self.channels.write(), BTreeSet::new())
let channels: Vec<RedisKey> = mem::take(&mut *self.channels.write())
.into_iter()
.map(|s| s.into())
.collect();
let patterns: Vec<RedisKey> = mem::replace(&mut *self.patterns.write(), BTreeSet::new())
let patterns: Vec<RedisKey> = mem::take(&mut *self.patterns.write())
.into_iter()
.map(|s| s.into())
.collect();
let shard_channels: Vec<RedisKey> = mem::replace(&mut *self.shard_channels.write(), BTreeSet::new())
let shard_channels: Vec<RedisKey> = mem::take(&mut *self.shard_channels.write())
.into_iter()
.map(|s| s.into())
.collect();

let _ = self.unsubscribe(channels).await?;
let _ = self.punsubscribe(patterns).await?;
self.unsubscribe(channels).await?;
self.punsubscribe(patterns).await?;

let shard_channel_groups = group_by_hash_slot(shard_channels)?;
let shard_subscriptions: Vec<_> = shard_channel_groups
Expand Down
2 changes: 1 addition & 1 deletion src/clients/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Replicas {
pub async fn sync(&self) -> Result<(), RedisError> {
let (tx, rx) = oneshot_channel();
let cmd = RouterCommand::SyncReplicas { tx };
let _ = interfaces::send_to_router(&self.inner, cmd)?;
interfaces::send_to_router(&self.inner, cmd)?;
rx.await?
}
}
8 changes: 4 additions & 4 deletions src/commands/impls/redis_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::{
use bytes_utils::Str;
use serde_json::Value;

const INDENT: &'static str = "INDENT";
const NEWLINE: &'static str = "NEWLINE";
const SPACE: &'static str = "SPACE";
const INDENT: &str = "INDENT";
const NEWLINE: &str = "NEWLINE";
const SPACE: &str = "SPACE";

fn key_path_args(key: RedisKey, path: Option<Str>, extra: usize) -> Vec<RedisValue> {
let mut out = Vec::with_capacity(2 + extra);
Expand Down Expand Up @@ -52,7 +52,7 @@ fn json_to_redis(value: Value) -> Result<RedisValue, RedisError> {
))
}

fn values_to_bulk(values: &Vec<Value>) -> Result<Vec<RedisValue>, RedisError> {
fn values_to_bulk(values: &[Value]) -> Result<Vec<RedisValue>, RedisError> {
values.iter().map(value_to_bulk_str).collect()
}

Expand Down
20 changes: 10 additions & 10 deletions src/commands/impls/tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use crate::{
use redis_protocol::redis_keyslot;
use tokio::sync::oneshot::channel as oneshot_channel;

pub static PREFIX: &'static str = "PREFIX";
pub static REDIRECT: &'static str = "REDIRECT";
pub static BCAST: &'static str = "BCAST";
pub static OPTIN: &'static str = "OPTIN";
pub static OPTOUT: &'static str = "OPTOUT";
pub static NOLOOP: &'static str = "NOLOOP";
pub static YES: &'static str = "YES";
pub static NO: &'static str = "NO";
pub static PREFIX: &str = "PREFIX";
pub static REDIRECT: &str = "REDIRECT";
pub static BCAST: &str = "BCAST";
pub static OPTIN: &str = "OPTIN";
pub static OPTOUT: &str = "OPTOUT";
pub static NOLOOP: &str = "NOLOOP";
pub static YES: &str = "YES";
pub static NO: &str = "NO";

fn tracking_args(
toggle: Toggle,
Expand Down Expand Up @@ -88,7 +88,7 @@ pub async fn start_tracking<C: ClientLike>(
let (tx, rx) = oneshot_channel();
let response = ResponseKind::new_buffer(tx);
let command: RedisCommand = (RedisCommandKind::_ClientTrackingCluster, args, response).into();
let _ = client.send_command(command)?;
client.send_command(command)?;

let frame = utils::apply_timeout(rx, client.inner().internal_command_timeout()).await??;
let _ = protocol_utils::frame_to_results(frame)?;
Expand Down Expand Up @@ -116,7 +116,7 @@ pub async fn stop_tracking<C: ClientLike>(client: &C) -> Result<(), RedisError>
let (tx, rx) = oneshot_channel();
let response = ResponseKind::new_buffer(tx);
let command: RedisCommand = (RedisCommandKind::_ClientTrackingCluster, args, response).into();
let _ = client.send_command(command)?;
client.send_command(command)?;

let frame = utils::apply_timeout(rx, client.inner().internal_command_timeout()).await??;
let _ = protocol_utils::frame_to_results(frame)?;
Expand Down
2 changes: 1 addition & 1 deletion src/commands/interfaces/sentinel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub trait SentinelInterface: ClientLike + Sized {
{
into!(name);
try_into!(args);
commands::sentinel::set(self, name, args.into()).await?.convert()
commands::sentinel::set(self, name, args).await?.convert()
}

/// This command simulates different Sentinel crash scenarios.
Expand Down
4 changes: 0 additions & 4 deletions src/commands/interfaces/timeseries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,7 @@ pub trait TimeSeriesInterface: ClientLike {
J: IntoIterator<Item = S> + Send,
{
try_into!(from, to);
let labels = labels.map(|l| l.into());
let filters = filters.into_iter().map(|s| s.into()).collect();
let group_by = group_by.map(|g| g.into());
let filter_by_ts = filter_by_ts.into_iter().collect();

commands::timeseries::ts_mrange(
Expand Down Expand Up @@ -370,9 +368,7 @@ pub trait TimeSeriesInterface: ClientLike {
J: IntoIterator<Item = S> + Send,
{
try_into!(from, to);
let labels = labels.map(|l| l.into());
let filters = filters.into_iter().map(|s| s.into()).collect();
let group_by = group_by.map(|g| g.into());
let filter_by_ts = filter_by_ts.into_iter().collect();

commands::timeseries::ts_mrevrange(
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::too_many_arguments)]
#![allow(clippy::new_without_default)]
#![warn(clippy::large_types_passed_by_value)]
#![warn(clippy::large_stack_frames)]
#![warn(clippy::large_futures)]
#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(docsrs, allow(unused_attributes))]
Expand Down
Loading

0 comments on commit b070601

Please sign in to comment.