Skip to content

Commit

Permalink
Add test of redis DLQ ordering
Browse files Browse the repository at this point in the history
I'm not sure if we need to generically guarantee this in our docs,
but it seems reasonable to validate it in our tests for now.
  • Loading branch information
jaymell authored and svix-james committed Sep 9, 2024
1 parent 85dbfeb commit a2817f2
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 1 deletion.
97 changes: 96 additions & 1 deletion omniqueue/tests/it/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async fn test_bytes_send_recv() {
d.ack().await.unwrap();
}

#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct ExType {
a: u8,
}
Expand Down Expand Up @@ -421,6 +421,101 @@ async fn test_deadletter_config() {
check_dlq(0).await;
}

// Assert that ordering is as expected. I don't know
// that we need to guarantee this in our docs, but it's
// good to at least validate it for now:
#[tokio::test]
async fn test_deadletter_config_order() {
let payload1 = ExType { a: 1 };
let payload2 = ExType { a: 2 };
let payload3 = ExType { a: 3 };

let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&stream_name, "test_cg", 0i8)
.await
.unwrap();

let max_receives = 1;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: stream_name.clone(),
delayed_queue_key: format!("{stream_name}::delayed"),
delayed_lock_key: format!("{stream_name}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 20,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_key.to_owned(),
max_receives,
}),
};

let check_dlq = |asserted_len: usize| {
let dlq_key = dlq_key.clone();
async move {
let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, -1).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
};

let (builder, _drop) = (
RedisBackend::builder(config),
RedisStreamDrop(stream_name.clone()),
);

let (p, mut c) = builder.build_pair().await.unwrap();

// Test send to DLQ via `ack_deadline_ms` expiration:
p.send_serde_json(&payload1).await.unwrap();
p.send_serde_json(&payload2).await.unwrap();
p.send_serde_json(&payload3).await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.nack().await.unwrap();
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Expected messages should be on DLQ:
check_dlq(3).await;

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.ack().await.unwrap();
}
}

// A message without a `num_receives` field shouldn't
// cause issues:
#[tokio::test]
Expand Down
91 changes: 91 additions & 0 deletions omniqueue/tests/it/redis_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,97 @@ async fn test_deadletter_config() {
check_dlq(0).await;
}

#[tokio::test]
async fn test_deadletter_config_order() {
let payload1 = ExType { a: 1 };
let payload2 = ExType { a: 2 };
let payload3 = ExType { a: 3 };

let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let client = ClusterClient::new(vec![ROOT_URL]).unwrap();
let mut conn = client.get_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&stream_name, "test_cg", 0i8)
.await
.unwrap();

let max_receives = 1;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: stream_name.clone(),
delayed_queue_key: format!("{stream_name}::delayed"),
delayed_lock_key: format!("{stream_name}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 20,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_key.to_owned(),
max_receives,
}),
};

let check_dlq = |asserted_len: usize| {
let dlq_key = dlq_key.clone();
let client = client.clone();
async move {
let mut conn = client.get_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, -1).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
};

let (builder, _drop) = (
RedisBackend::builder(config).cluster(),
RedisStreamDrop(stream_name.clone()),
);

let (p, mut c) = builder.build_pair().await.unwrap();

// Test send to DLQ via `ack_deadline_ms` expiration:
p.send_serde_json(&payload1).await.unwrap();
p.send_serde_json(&payload2).await.unwrap();
p.send_serde_json(&payload3).await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.nack().await.unwrap();
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Expected messages should be on DLQ:
check_dlq(3).await;

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.ack().await.unwrap();
}
}
// A message without a `num_receives` field shouldn't
// cause issues:
#[tokio::test]
Expand Down
84 changes: 84 additions & 0 deletions omniqueue/tests/it/redis_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,90 @@ async fn test_deadletter_config() {
*/
}

#[tokio::test]
async fn test_deadletter_config_order() {
let payload1 = ExType { a: 1 };
let payload2 = ExType { a: 2 };
let payload3 = ExType { a: 3 };

let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let max_receives = 1;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: queue_key.clone(),
delayed_queue_key: format!("{queue_key}::delayed"),
delayed_lock_key: format!("{queue_key}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 1,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_key.to_owned(),
max_receives,
}),
};

let check_dlq = |asserted_len: usize| {
let dlq_key = dlq_key.clone();
async move {
let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, -1).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
};

let (builder, _drop) = (
RedisBackend::builder(config).use_redis_streams(false),
RedisKeyDrop(queue_key),
);

let (p, mut c) = builder.build_pair().await.unwrap();

// Test send to DLQ via `ack_deadline_ms` expiration:
p.send_serde_json(&payload1).await.unwrap();
p.send_serde_json(&payload2).await.unwrap();
p.send_serde_json(&payload3).await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.nack().await.unwrap();
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Expected messages should be on DLQ:
check_dlq(3).await;

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.ack().await.unwrap();
}
}

// A message without a `num_receives` field shouldn't
// cause issues:
#[tokio::test]
Expand Down

0 comments on commit a2817f2

Please sign in to comment.