diff --git a/omniqueue/tests/it/redis_cluster.rs b/omniqueue/tests/it/redis_cluster.rs index 37b419c..79f82ad 100644 --- a/omniqueue/tests/it/redis_cluster.rs +++ b/omniqueue/tests/it/redis_cluster.rs @@ -1,6 +1,11 @@ -use std::time::{Duration, Instant}; - -use omniqueue::backends::{RedisBackend, RedisClusterBackendBuilder, RedisConfig}; +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use omniqueue::backends::{ + redis::DeadLetterQueueConfig, RedisBackend, RedisClusterBackendBuilder, RedisConfig, +}; use redis::{cluster::ClusterClient, AsyncCommands, Commands}; use serde::{Deserialize, Serialize}; @@ -294,3 +299,189 @@ async fn test_pending() { .unwrap() .is_empty()); } + +#[tokio::test] +async fn test_deadletter_config() { + let payload = ExType { a: 1 }; + + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = ClusterClient::new(vec![ROOT_URL]).unwrap(); + let mut conn = client.get_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 1, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: "dlq-key".to_owned(), + max_receives, + }), + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).cluster(), + RedisStreamDrop(stream_name), + ); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +} + +// A message without a `num_receives` field shouldn't +// cause issues: +#[tokio::test] +async fn test_backward_compatible() { + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = ClusterClient::new(vec![ROOT_URL]).unwrap(); + let mut conn = client.get_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 20, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_key.to_owned(), + max_receives, + }), + }; + + let check_dlq = |asserted_len: usize| { + let dlq_key = dlq_key.clone(); + async move { + let client = ClusterClient::new(vec![ROOT_URL]).unwrap(); + let mut conn = client.get_async_connection().await.unwrap(); + + let mut res: Vec = conn.lpop(&dlq_key, NonZeroUsize::new(100)).await.unwrap(); + assert!(res.len() == asserted_len); + res.pop() + } + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).cluster(), + RedisStreamDrop(stream_name.clone()), + ); + + let (_p, mut c) = builder.build_pair().await.unwrap(); + + let org_payload = ExType { a: 1 }; + let org_payload_str = serde_json::to_string(&org_payload).unwrap(); + + // Test send to DLQ via `ack_deadline_ms` expiration: + let _: () = conn + .xadd( + &stream_name, + "*", + &[("payload", org_payload_str.as_bytes())], + ) + .await + .unwrap(); + + for _ in 0..max_receives { + check_dlq(0).await; + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&org_payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); + + // Expected message should be on DLQ: + let res = check_dlq(1).await; + assert_eq!(org_payload_str, res.unwrap()); + + // Test send to DLQ via explicit `nack`ing: + let _: () = conn + .xadd( + &stream_name, + "*", + &[("payload", org_payload_str.as_bytes())], + ) + .await + .unwrap(); + + for _ in 0..max_receives { + check_dlq(0).await; + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&org_payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + delivery.nack().await.unwrap(); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); + + // Expected message should be on DLQ: + let res = check_dlq(1).await; + assert_eq!(org_payload_str, res.unwrap()); +}