diff --git a/omniqueue/src/backends/azure_queue_storage.rs b/omniqueue/src/backends/azure_queue_storage.rs index 2674125..40d2a8c 100644 --- a/omniqueue/src/backends/azure_queue_storage.rs +++ b/omniqueue/src/backends/azure_queue_storage.rs @@ -133,11 +133,17 @@ impl AqsProducer { let payload = serde_json::to_string(payload)?; self.send_raw_scheduled(&payload, delay).await } + + pub async fn redrive_dlq(&self) -> Result<()> { + Err(QueueError::Unsupported( + "redrive_dlq is not supported by AqsBackend", + )) + } } impl crate::QueueProducer for AqsProducer { type Payload = String; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); } impl crate::ScheduledQueueProducer for AqsProducer { omni_delegate!(send_raw_scheduled, send_serde_json_scheduled); diff --git a/omniqueue/src/backends/gcp_pubsub.rs b/omniqueue/src/backends/gcp_pubsub.rs index 1211861..4590e1f 100644 --- a/omniqueue/src/backends/gcp_pubsub.rs +++ b/omniqueue/src/backends/gcp_pubsub.rs @@ -166,6 +166,12 @@ impl GcpPubSubProducer { pub async fn send_serde_json(&self, payload: &P) -> Result<()> { self.send_raw(&serde_json::to_vec(&payload)?).await } + + pub async fn redrive_dlq(&self) -> Result<()> { + Err(QueueError::Unsupported( + "redrive_dlq is not supported by GcpPubSubBackend", + )) + } } impl std::fmt::Debug for GcpPubSubProducer { @@ -178,7 +184,7 @@ impl std::fmt::Debug for GcpPubSubProducer { impl crate::QueueProducer for GcpPubSubProducer { type Payload = Payload; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); /// This method is overwritten for the Google Cloud Pub/Sub backend to be /// more efficient than the default of sequentially publishing `payloads`. diff --git a/omniqueue/src/backends/in_memory.rs b/omniqueue/src/backends/in_memory.rs index bddca5f..8dc3131 100644 --- a/omniqueue/src/backends/in_memory.rs +++ b/omniqueue/src/backends/in_memory.rs @@ -83,11 +83,17 @@ impl InMemoryProducer { let payload = serde_json::to_vec(payload)?; self.send_raw_scheduled(&payload, delay).await } + + pub async fn redrive_dlq(&self) -> Result<()> { + Err(QueueError::Unsupported( + "redrive_dlq is not supported by InMemoryBackend", + )) + } } impl crate::QueueProducer for InMemoryProducer { type Payload = Vec; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); } impl crate::ScheduledQueueProducer for InMemoryProducer { omni_delegate!(send_raw_scheduled, send_serde_json_scheduled); diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index bfdbda0..f7c6d0e 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -202,11 +202,17 @@ impl RabbitMqProducer { let payload = serde_json::to_vec(payload)?; self.send_raw_scheduled(&payload, delay).await } + + pub async fn redrive_dlq(&self) -> Result<()> { + Err(QueueError::Unsupported( + "redrive_dlq is not supported by RabbitMqBackend", + )) + } } impl crate::QueueProducer for RabbitMqProducer { type Payload = Vec; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); } impl crate::ScheduledQueueProducer for RabbitMqProducer { omni_delegate!(send_raw_scheduled, send_serde_json_scheduled); diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index fca515c..2239ada 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -226,7 +226,7 @@ async fn send_to_dlq( .get() .await .map_err(QueueError::generic)? - .lpush(dlq, payload) + .rpush(dlq, payload) .await .map_err(QueueError::generic)?; diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index e2ef496..d6b9ffe 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -46,7 +46,7 @@ use serde::Serialize; use svix_ksuid::KsuidLike; use thiserror::Error; use tokio::task::JoinSet; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, info, trace, warn}; #[allow(deprecated)] use crate::{ @@ -390,6 +390,7 @@ impl RedisBackendBuilder { payload_key: self.config.payload_key.clone(), use_redis_streams: self.use_redis_streams, _background_tasks: background_tasks.clone(), + dlq_config: self.config.dlq_config.clone(), }, RedisConsumer { redis, @@ -421,6 +422,7 @@ impl RedisBackendBuilder { payload_key: self.config.payload_key, use_redis_streams: self.use_redis_streams, _background_tasks, + dlq_config: self.config.dlq_config, }) } @@ -444,7 +446,7 @@ impl RedisBackendBuilder { payload_key: self.config.payload_key, use_redis_streams: self.use_redis_streams, _background_tasks, - dlq_config: self.config.dlq_config.clone(), + dlq_config: self.config.dlq_config, }) } @@ -644,6 +646,7 @@ pub struct RedisProducer { payload_key: String, use_redis_streams: bool, _background_tasks: Arc>>, + dlq_config: Option, } impl RedisProducer { @@ -698,11 +701,61 @@ impl RedisProducer { let payload = serde_json::to_vec(payload)?; self.send_raw_scheduled(&payload, delay).await } + + pub async fn redrive_dlq(&self) -> Result<()> { + const BATCH_SIZE: isize = 50; + + let DeadLetterQueueConfig { queue_key: dlq, .. } = self + .dlq_config + .as_ref() + .ok_or(QueueError::Unsupported("Missing DeadLetterQueueConfig"))?; + + loop { + let mut conn = self.redis.get().await.map_err(QueueError::generic)?; + let old_payloads: Vec = conn + .lrange(dlq, 0, BATCH_SIZE) + .await + .map_err(QueueError::generic)?; + + if old_payloads.is_empty() { + break; + } + + let new_payloads = old_payloads + .iter() + .map(|x| InternalPayload::new(x)) + .collect::>(); + + if self.use_redis_streams { + streams::add_to_main_queue( + new_payloads, + &self.queue_key, + &self.payload_key, + &mut *conn, + ) + .await?; + } else { + // This may fail if messages in the key are not in their original raw format. + fallback::add_to_main_queue(new_payloads, &self.queue_key, &mut *conn).await?; + } + + let payload_len = old_payloads.len(); + for payload in old_payloads { + let _: () = conn + .lrem(dlq, 1, &payload) + .await + .map_err(QueueError::generic)?; + } + info!("Moved {payload_len} items from deadletter queue to main queue"); + } + + Ok(()) + } } impl crate::QueueProducer for RedisProducer { type Payload = Vec; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); } impl crate::ScheduledQueueProducer for RedisProducer { omni_delegate!(send_raw_scheduled, send_serde_json_scheduled); diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index e034eea..d9acdbe 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -347,7 +347,7 @@ async fn send_to_dlq( .get() .await .map_err(QueueError::generic)? - .lpush(dlq, &payload) + .rpush(dlq, &payload) .await .map_err(QueueError::generic)?; diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index 31510a7..6800724 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -355,11 +355,17 @@ impl SqsProducer { Ok(()) } + + pub async fn redrive_dlq(&self) -> Result<()> { + Err(QueueError::Unsupported( + "redrive_dlq is not supported by SqsBackend", + )) + } } impl crate::QueueProducer for SqsProducer { type Payload = String; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); /// This method is overwritten for the SQS backend to be more efficient /// than the default of sequentially publishing `payloads`. diff --git a/omniqueue/src/macros.rs b/omniqueue/src/macros.rs index 3cdd068..2b2efd0 100644 --- a/omniqueue/src/macros.rs +++ b/omniqueue/src/macros.rs @@ -53,6 +53,15 @@ macro_rules! omni_delegate { Self::send_serde_json_scheduled(self, payload, delay) } }; + ( redrive_dlq ) => { + #[deny(unconditional_recursion)] // method call must defer to an inherent method + fn redrive_dlq( + &self, + ) -> impl std::future::Future> + Send { + Self::redrive_dlq(self) + } + }; + ( $method1:ident, $($rest:ident),* $(,)? ) => { omni_delegate!($method1); omni_delegate!($($rest),*); diff --git a/omniqueue/src/queue/producer.rs b/omniqueue/src/queue/producer.rs index bd0e430..a130c88 100644 --- a/omniqueue/src/queue/producer.rs +++ b/omniqueue/src/queue/producer.rs @@ -9,6 +9,8 @@ pub trait QueueProducer: Send + Sync + Sized { fn send_raw(&self, payload: &Self::Payload) -> impl Future> + Send; + fn redrive_dlq(&self) -> impl Future> + Send; + /// Send a batch of raw messages. /// /// The default implementation of this sends the payloads sequentially using @@ -97,6 +99,8 @@ pub(crate) trait ErasedQueueProducer: Send + Sync { &'a self, payload: &'a [u8], ) -> Pin> + Send + 'a>>; + + fn redrive_dlq<'a>(&'a self) -> Pin> + Send + 'a>>; } struct DynProducerInner

{ @@ -110,6 +114,10 @@ impl ErasedQueueProducer for DynProducerInner

{ ) -> Pin> + Send + 'a>> { Box::pin(async move { self.inner.send_bytes(payload).await }) } + + fn redrive_dlq<'a>(&'a self) -> Pin> + Send + 'a>> { + Box::pin(async move { self.inner.redrive_dlq().await }) + } } impl DynProducer { @@ -121,9 +129,13 @@ impl DynProducer { let payload = serde_json::to_vec(payload)?; self.send_raw(&payload).await } + + pub async fn redrive_dlq(&self) -> Result<()> { + self.0.redrive_dlq().await + } } impl crate::QueueProducer for DynProducer { type Payload = Vec; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); } diff --git a/omniqueue/src/scheduled/mod.rs b/omniqueue/src/scheduled/mod.rs index 054bb56..3d16986 100644 --- a/omniqueue/src/scheduled/mod.rs +++ b/omniqueue/src/scheduled/mod.rs @@ -69,6 +69,9 @@ impl ErasedQueueProducer for DynScheduledProducerInne ) -> Pin> + Send + 'a>> { Box::pin(async move { self.inner.send_bytes(payload).await }) } + fn redrive_dlq<'a>(&'a self) -> Pin> + Send + 'a>> { + Box::pin(async move { self.inner.redrive_dlq().await }) + } } impl ErasedScheduledQueueProducer for DynScheduledProducerInner

{ @@ -103,11 +106,15 @@ impl DynScheduledProducer { let payload = serde_json::to_vec(payload)?; self.0.send_raw_scheduled(&payload, delay).await } + + pub async fn redrive_dlq(&self) -> Result<()> { + self.0.redrive_dlq().await + } } impl crate::QueueProducer for DynScheduledProducer { type Payload = Vec; - omni_delegate!(send_raw, send_serde_json); + omni_delegate!(send_raw, send_serde_json, redrive_dlq); } impl crate::ScheduledQueueProducer for DynScheduledProducer { omni_delegate!(send_raw_scheduled, send_serde_json_scheduled); diff --git a/omniqueue/tests/it/redis.rs b/omniqueue/tests/it/redis.rs index 4102c93..bbad302 100644 --- a/omniqueue/tests/it/redis.rs +++ b/omniqueue/tests/it/redis.rs @@ -1,11 +1,11 @@ -use std::{ - num::NonZeroUsize, - time::{Duration, Instant}, -}; - -use omniqueue::backends::{ - redis::{DeadLetterQueueConfig, RedisBackendBuilder}, - RedisBackend, RedisConfig, +use std::time::{Duration, Instant}; + +use omniqueue::{ + backends::{ + redis::{DeadLetterQueueConfig, RedisBackendBuilder}, + RedisBackend, RedisConfig, + }, + Delivery, }; use redis::{AsyncCommands, Client, Commands}; use serde::{Deserialize, Serialize}; @@ -87,7 +87,7 @@ async fn test_bytes_send_recv() { d.ack().await.unwrap(); } -#[derive(Debug, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub struct ExType { a: u8, } @@ -301,11 +301,16 @@ async fn test_pending() { #[tokio::test] async fn test_deadletter_config() { let payload = ExType { a: 1 }; + let payload_str = serde_json::to_string(&payload).unwrap(); let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) .take(8) .collect(); + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + let client = Client::open(ROOT_URL).unwrap(); let mut conn = client.get_multiplexed_async_connection().await.unwrap(); @@ -326,20 +331,36 @@ async fn test_deadletter_config() { consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), - ack_deadline_ms: 1, + ack_deadline_ms: 20, dlq_config: Some(DeadLetterQueueConfig { - queue_key: "dlq-key".to_owned(), + queue_key: dlq_key.to_owned(), max_receives, }), }; - let (builder, _drop) = (RedisBackend::builder(config), RedisStreamDrop(stream_name)); + let check_dlq = |asserted_len: usize| { + let dlq_key = dlq_key.clone(); + async move { + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + let mut res: Vec = conn.lrange(&dlq_key, 0, 0).await.unwrap(); + assert!(res.len() == asserted_len); + res.pop() + } + }; + + let (builder, _drop) = ( + RedisBackend::builder(config), + RedisStreamDrop(stream_name.clone()), + ); let (p, mut c) = builder.build_pair().await.unwrap(); + // Test send to DLQ via `ack_deadline_ms` expiration: p.send_serde_json(&payload).await.unwrap(); for _ in 0..max_receives { + check_dlq(0).await; let delivery = c.receive().await.unwrap(); assert_eq!( Some(&payload), @@ -354,12 +375,61 @@ async fn test_deadletter_config() { .await .unwrap(); assert!(delivery.is_empty()); + + // Expected message should be on DLQ: + let res = check_dlq(1).await; + assert_eq!(payload_str, res.unwrap()); + + // Test send to DLQ via explicit `nack`ing: + let _: () = conn + .xadd(&stream_name, "*", &[("payload", payload_str.as_bytes())]) + .await + .unwrap(); + + let assert_delivery = |delivery: &Delivery| { + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + }; + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_delivery(&delivery); + delivery.nack().await.unwrap(); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); + + // Expected message should be on DLQ: + let res = check_dlq(1).await; + assert_eq!(payload_str, res.unwrap()); + + // Redrive DLQ, receive from main queue, ack: + p.redrive_dlq().await.unwrap(); + + let delivery = c.receive().await.unwrap(); + assert_delivery(&delivery); + delivery.ack().await.unwrap(); + + check_dlq(0).await; } -// A message without a `num_receives` field shouldn't -// cause issues: +// Assert that ordering is as expected. I don't know +// that we need to guarantee this in our docs, but it's +// good to at least validate it for now: #[tokio::test] -async fn test_backward_compatible() { +async fn test_deadletter_config_order() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) .take(8) .collect(); @@ -376,7 +446,7 @@ async fn test_backward_compatible() { .await .unwrap(); - let max_receives = 5; + let max_receives = 1; let config = RedisConfig { dsn: ROOT_URL.to_owned(), @@ -400,7 +470,7 @@ async fn test_backward_compatible() { async move { let client = Client::open(ROOT_URL).unwrap(); let mut conn = client.get_multiplexed_async_connection().await.unwrap(); - let mut res: Vec = conn.lpop(&dlq_key, NonZeroUsize::new(100)).await.unwrap(); + let mut res: Vec = conn.lrange(&dlq_key, 0, -1).await.unwrap(); assert!(res.len() == asserted_len); res.pop() } @@ -411,60 +481,106 @@ async fn test_backward_compatible() { RedisStreamDrop(stream_name.clone()), ); - let (_p, mut c) = builder.build_pair().await.unwrap(); - - let org_payload = ExType { a: 1 }; - let org_payload_str = serde_json::to_string(&org_payload).unwrap(); + let (p, mut c) = builder.build_pair().await.unwrap(); // Test send to DLQ via `ack_deadline_ms` expiration: - let _: () = conn - .xadd( - &stream_name, - "*", - &[("payload", org_payload_str.as_bytes())], - ) - .await - .unwrap(); + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); - for _ in 0..max_receives { - check_dlq(0).await; + for payload in [&payload1, &payload2, &payload3] { let delivery = c.receive().await.unwrap(); assert_eq!( - Some(&org_payload), + Some(payload), delivery.payload_serde_json().unwrap().as_ref() ); + delivery.nack().await.unwrap(); } // Give this some time because the reenqueuing can sleep for up to 500ms tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let delivery = c - .receive_all(1, std::time::Duration::from_millis(1)) + + // Expected messages should be on DLQ: + check_dlq(3).await; + + // Redrive DLQ, receive from main queue, ack: + p.redrive_dlq().await.unwrap(); + + for payload in [&payload1, &payload2, &payload3] { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + delivery.ack().await.unwrap(); + } +} + +// A message without a `num_receives` field shouldn't +// cause issues: +#[tokio::test] +async fn test_backward_compatible() { + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) .await .unwrap(); - assert!(delivery.is_empty()); - // Expected message should be on DLQ: - let res = check_dlq(1).await; - assert_eq!(org_payload_str, res.unwrap()); + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 20, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_key.to_owned(), + max_receives, + }), + }; + + let (builder, _drop) = ( + RedisBackend::builder(config), + RedisStreamDrop(stream_name.clone()), + ); + + let (_p, mut c) = builder.build_pair().await.unwrap(); + + let org_payload = ExType { a: 1 }; + let org_payload_str = serde_json::to_string(&org_payload).unwrap(); - // Test send to DLQ via explicit `nack`ing: let _: () = conn .xadd( &stream_name, "*", + // We don't have the `num_receives` field: &[("payload", org_payload_str.as_bytes())], ) .await .unwrap(); for _ in 0..max_receives { - check_dlq(0).await; let delivery = c.receive().await.unwrap(); assert_eq!( Some(&org_payload), delivery.payload_serde_json().unwrap().as_ref() ); - delivery.nack().await.unwrap(); } // Give this some time because the reenqueuing can sleep for up to 500ms @@ -474,8 +590,4 @@ async fn test_backward_compatible() { .await .unwrap(); assert!(delivery.is_empty()); - - // Expected message should be on DLQ: - let res = check_dlq(1).await; - assert_eq!(org_payload_str, res.unwrap()); } diff --git a/omniqueue/tests/it/redis_cluster.rs b/omniqueue/tests/it/redis_cluster.rs index 37b419c..8cfb94a 100644 --- a/omniqueue/tests/it/redis_cluster.rs +++ b/omniqueue/tests/it/redis_cluster.rs @@ -1,6 +1,11 @@ use std::time::{Duration, Instant}; -use omniqueue::backends::{RedisBackend, RedisClusterBackendBuilder, RedisConfig}; +use omniqueue::{ + backends::{ + redis::DeadLetterQueueConfig, RedisBackend, RedisClusterBackendBuilder, RedisConfig, + }, + Delivery, +}; use redis::{cluster::ClusterClient, AsyncCommands, Commands}; use serde::{Deserialize, Serialize}; @@ -294,3 +299,293 @@ async fn test_pending() { .unwrap() .is_empty()); } + +#[tokio::test] +async fn test_deadletter_config() { + let payload = ExType { a: 1 }; + let payload_str = serde_json::to_string(&payload).unwrap(); + + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = ClusterClient::new(vec![ROOT_URL]).unwrap(); + let mut conn = client.get_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 20, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_key.to_owned(), + max_receives, + }), + }; + + let check_dlq = |asserted_len: usize| { + let dlq_key = dlq_key.clone(); + let client = client.clone(); + async move { + let mut conn = client.get_async_connection().await.unwrap(); + let mut res: Vec = conn.lrange(&dlq_key, 0, 0).await.unwrap(); + assert!(res.len() == asserted_len); + res.pop() + } + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).cluster(), + RedisStreamDrop(stream_name.clone()), + ); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + // Test send to DLQ via `ack_deadline_ms` expiration: + p.send_serde_json(&payload).await.unwrap(); + + for _ in 0..max_receives { + check_dlq(0).await; + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); + + // Expected message should be on DLQ: + let res = check_dlq(1).await; + assert_eq!(payload_str, res.unwrap()); + + // Test send to DLQ via explicit `nack`ing: + let _: () = conn + .xadd(&stream_name, "*", &[("payload", payload_str.as_bytes())]) + .await + .unwrap(); + + let assert_delivery = |delivery: &Delivery| { + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + }; + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_delivery(&delivery); + delivery.nack().await.unwrap(); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); + + // Expected message should be on DLQ: + let res = check_dlq(1).await; + assert_eq!(payload_str, res.unwrap()); + + // Redrive DLQ, receive from main queue, ack: + p.redrive_dlq().await.unwrap(); + + let delivery = c.receive().await.unwrap(); + assert_delivery(&delivery); + delivery.ack().await.unwrap(); + + check_dlq(0).await; +} + +#[tokio::test] +async fn test_deadletter_config_order() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = ClusterClient::new(vec![ROOT_URL]).unwrap(); + let mut conn = client.get_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 1; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 20, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_key.to_owned(), + max_receives, + }), + }; + + let check_dlq = |asserted_len: usize| { + let dlq_key = dlq_key.clone(); + let client = client.clone(); + async move { + let mut conn = client.get_async_connection().await.unwrap(); + let mut res: Vec = conn.lrange(&dlq_key, 0, -1).await.unwrap(); + assert!(res.len() == asserted_len); + res.pop() + } + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).cluster(), + RedisStreamDrop(stream_name.clone()), + ); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + // Test send to DLQ via `ack_deadline_ms` expiration: + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + for payload in [&payload1, &payload2, &payload3] { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + delivery.nack().await.unwrap(); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Expected messages should be on DLQ: + check_dlq(3).await; + + // Redrive DLQ, receive from main queue, ack: + p.redrive_dlq().await.unwrap(); + + for payload in [&payload1, &payload2, &payload3] { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + delivery.ack().await.unwrap(); + } +} +// A message without a `num_receives` field shouldn't +// cause issues: +#[tokio::test] +async fn test_backward_compatible() { + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = ClusterClient::new(vec![ROOT_URL]).unwrap(); + let mut conn = client.get_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 20, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_key.to_owned(), + max_receives, + }), + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).cluster(), + RedisStreamDrop(stream_name.clone()), + ); + + let (_p, mut c) = builder.build_pair().await.unwrap(); + + let org_payload = ExType { a: 1 }; + let org_payload_str = serde_json::to_string(&org_payload).unwrap(); + + let _: () = conn + .xadd( + &stream_name, + "*", + // We don't have the `num_receives` field: + &[("payload", org_payload_str.as_bytes())], + ) + .await + .unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&org_payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +} diff --git a/omniqueue/tests/it/redis_fallback.rs b/omniqueue/tests/it/redis_fallback.rs index ba7f5bb..347d189 100644 --- a/omniqueue/tests/it/redis_fallback.rs +++ b/omniqueue/tests/it/redis_fallback.rs @@ -1,12 +1,12 @@ use core::str; -use std::{ - num::NonZeroUsize, - time::{Duration, Instant}, -}; - -use omniqueue::backends::{ - redis::{DeadLetterQueueConfig, RedisBackendBuilder}, - RedisBackend, RedisConfig, +use std::time::{Duration, Instant}; + +use omniqueue::{ + backends::{ + redis::{DeadLetterQueueConfig, RedisBackendBuilder}, + RedisBackend, RedisConfig, + }, + Delivery, }; use redis::{AsyncCommands, Client, Commands}; use serde::{Deserialize, Serialize}; @@ -335,7 +335,7 @@ async fn test_deadletter_config() { async move { let client = Client::open(ROOT_URL).unwrap(); let mut conn = client.get_multiplexed_async_connection().await.unwrap(); - let mut res: Vec = conn.lpop(&dlq_key, NonZeroUsize::new(100)).await.unwrap(); + let mut res: Vec = conn.lrange(&dlq_key, 0, 0).await.unwrap(); assert!(res.len() == asserted_len); res.pop() } @@ -351,13 +351,17 @@ async fn test_deadletter_config() { // Test send to DLQ via `ack_deadline_ms` expiration: p.send_serde_json(&payload).await.unwrap(); - for _ in 0..max_receives { - check_dlq(0).await; - let delivery = c.receive().await.unwrap(); + let assert_delivery = |delivery: &Delivery| { assert_eq!( Some(&payload), delivery.payload_serde_json().unwrap().as_ref() ); + }; + + for _ in 0..max_receives { + check_dlq(0).await; + let delivery = c.receive().await.unwrap(); + assert_delivery(&delivery); } // Give this some time because the reenqueuing can sleep for up to 500ms @@ -372,6 +376,15 @@ async fn test_deadletter_config() { let res = check_dlq(1).await; assert_eq!(serde_json::to_string(&payload).unwrap(), res.unwrap()); + // Redrive DLQ, receive from main queue, ack: + p.redrive_dlq().await.unwrap(); + + let delivery = c.receive().await.unwrap(); + assert_delivery(&delivery); + delivery.ack().await.unwrap(); + + check_dlq(0).await; + /* This portion of test is flaky due to https://github.com/svix/omniqueue-rs/issues/102 // Test send to DLQ via explicit `nack`ing: @@ -380,10 +393,7 @@ async fn test_deadletter_config() { for _ in 0..max_receives { check_dlq(0).await; let delivery = c.receive().await.unwrap(); - assert_eq!( - Some(&payload), - delivery.payload_serde_json().unwrap().as_ref() - ); + assert_delivery(&delivery); delivery.nack().await.unwrap(); } @@ -402,6 +412,90 @@ async fn test_deadletter_config() { */ } +#[tokio::test] +async fn test_deadletter_config_order() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + + let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let max_receives = 1; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: queue_key.clone(), + delayed_queue_key: format!("{queue_key}::delayed"), + delayed_lock_key: format!("{queue_key}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 1, + dlq_config: Some(DeadLetterQueueConfig { + queue_key: dlq_key.to_owned(), + max_receives, + }), + }; + + let check_dlq = |asserted_len: usize| { + let dlq_key = dlq_key.clone(); + async move { + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + let mut res: Vec = conn.lrange(&dlq_key, 0, -1).await.unwrap(); + assert!(res.len() == asserted_len); + res.pop() + } + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).use_redis_streams(false), + RedisKeyDrop(queue_key), + ); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + // Test send to DLQ via `ack_deadline_ms` expiration: + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + for payload in [&payload1, &payload2, &payload3] { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + delivery.nack().await.unwrap(); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Expected messages should be on DLQ: + check_dlq(3).await; + + // Redrive DLQ, receive from main queue, ack: + p.redrive_dlq().await.unwrap(); + + for payload in [&payload1, &payload2, &payload3] { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + delivery.ack().await.unwrap(); + } +} + // A message without a `num_receives` field shouldn't // cause issues: #[tokio::test] @@ -410,6 +504,10 @@ async fn test_backward_compatible() { .take(8) .collect(); + let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + let max_receives = 5; let config = RedisConfig { @@ -424,7 +522,7 @@ async fn test_backward_compatible() { payload_key: "payload".to_owned(), ack_deadline_ms: 20, dlq_config: Some(DeadLetterQueueConfig { - queue_key: "dlq-key".to_owned(), + queue_key: dlq_key.to_owned(), max_receives, }), };