Skip to content

Commit

Permalink
Add missing tests to redis_cluster module
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Sep 6, 2024
1 parent ec956f6 commit eb904d1
Showing 1 changed file with 194 additions and 3 deletions.
197 changes: 194 additions & 3 deletions omniqueue/tests/it/redis_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::time::{Duration, Instant};

use omniqueue::backends::{RedisBackend, RedisClusterBackendBuilder, RedisConfig};
use std::{
num::NonZeroUsize,
time::{Duration, Instant},
};

use omniqueue::backends::{
redis::DeadLetterQueueConfig, RedisBackend, RedisClusterBackendBuilder, RedisConfig,
};
use redis::{cluster::ClusterClient, AsyncCommands, Commands};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -294,3 +299,189 @@ async fn test_pending() {
.unwrap()
.is_empty());
}

#[tokio::test]
async fn test_deadletter_config() {
let payload = ExType { a: 1 };

let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let client = ClusterClient::new(vec![ROOT_URL]).unwrap();
let mut conn = client.get_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&stream_name, "test_cg", 0i8)
.await
.unwrap();

let max_receives = 5;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: stream_name.clone(),
delayed_queue_key: format!("{stream_name}::delayed"),
delayed_lock_key: format!("{stream_name}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 1,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: "dlq-key".to_owned(),
max_receives,
}),
};

let (builder, _drop) = (
RedisBackend::builder(config).cluster(),
RedisStreamDrop(stream_name),
);

let (p, mut c) = builder.build_pair().await.unwrap();

p.send_serde_json(&payload).await.unwrap();

for _ in 0..max_receives {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(&payload),
delivery.payload_serde_json().unwrap().as_ref()
);
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let delivery = c
.receive_all(1, std::time::Duration::from_millis(1))
.await
.unwrap();
assert!(delivery.is_empty());
}

// A message without a `num_receives` field shouldn't
// cause issues:
#[tokio::test]
async fn test_backward_compatible() {
let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let client = ClusterClient::new(vec![ROOT_URL]).unwrap();
let mut conn = client.get_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&stream_name, "test_cg", 0i8)
.await
.unwrap();

let max_receives = 5;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: stream_name.clone(),
delayed_queue_key: format!("{stream_name}::delayed"),
delayed_lock_key: format!("{stream_name}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 20,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_key.to_owned(),
max_receives,
}),
};

let check_dlq = |asserted_len: usize| {
let dlq_key = dlq_key.clone();
async move {
let client = ClusterClient::new(vec![ROOT_URL]).unwrap();
let mut conn = client.get_async_connection().await.unwrap();

let mut res: Vec<String> = conn.lpop(&dlq_key, NonZeroUsize::new(100)).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
};

let (builder, _drop) = (
RedisBackend::builder(config).cluster(),
RedisStreamDrop(stream_name.clone()),
);

let (_p, mut c) = builder.build_pair().await.unwrap();

let org_payload = ExType { a: 1 };
let org_payload_str = serde_json::to_string(&org_payload).unwrap();

// Test send to DLQ via `ack_deadline_ms` expiration:
let _: () = conn
.xadd(
&stream_name,
"*",
&[("payload", org_payload_str.as_bytes())],
)
.await
.unwrap();

for _ in 0..max_receives {
check_dlq(0).await;
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(&org_payload),
delivery.payload_serde_json().unwrap().as_ref()
);
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let delivery = c
.receive_all(1, std::time::Duration::from_millis(1))
.await
.unwrap();
assert!(delivery.is_empty());

// Expected message should be on DLQ:
let res = check_dlq(1).await;
assert_eq!(org_payload_str, res.unwrap());

// Test send to DLQ via explicit `nack`ing:
let _: () = conn
.xadd(
&stream_name,
"*",
&[("payload", org_payload_str.as_bytes())],
)
.await
.unwrap();

for _ in 0..max_receives {
check_dlq(0).await;
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(&org_payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.nack().await.unwrap();
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let delivery = c
.receive_all(1, std::time::Duration::from_millis(1))
.await
.unwrap();
assert!(delivery.is_empty());

// Expected message should be on DLQ:
let res = check_dlq(1).await;
assert_eq!(org_payload_str, res.unwrap());
}

0 comments on commit eb904d1

Please sign in to comment.