Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DLQ redrive in redis #104

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion omniqueue/src/backends/azure_queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,17 @@ impl AqsProducer {
let payload = serde_json::to_string(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by AqsBackend",
))
}
}

impl crate::QueueProducer for AqsProducer {
type Payload = String;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for AqsProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ impl GcpPubSubProducer {
pub async fn send_serde_json<P: Serialize + Sync>(&self, payload: &P) -> Result<()> {
self.send_raw(&serde_json::to_vec(&payload)?).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by GcpPubSubBackend",
))
}
}

impl std::fmt::Debug for GcpPubSubProducer {
Expand All @@ -178,7 +184,7 @@ impl std::fmt::Debug for GcpPubSubProducer {

impl crate::QueueProducer for GcpPubSubProducer {
type Payload = Payload;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);

/// This method is overwritten for the Google Cloud Pub/Sub backend to be
/// more efficient than the default of sequentially publishing `payloads`.
Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,17 @@ impl InMemoryProducer {
let payload = serde_json::to_vec(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by InMemoryBackend",
))
}
}

impl crate::QueueProducer for InMemoryProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for InMemoryProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,17 @@ impl RabbitMqProducer {
let payload = serde_json::to_vec(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by RabbitMqBackend",
))
}
}

impl crate::QueueProducer for RabbitMqProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for RabbitMqProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
2 changes: 1 addition & 1 deletion omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async fn send_to_dlq<R: RedisConnection>(
.get()
.await
.map_err(QueueError::generic)?
.lpush(dlq, payload)
.rpush(dlq, payload)
jaymell marked this conversation as resolved.
Show resolved Hide resolved
.await
.map_err(QueueError::generic)?;

Expand Down
59 changes: 56 additions & 3 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use serde::Serialize;
use svix_ksuid::KsuidLike;
use thiserror::Error;
use tokio::task::JoinSet;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, info, trace, warn};

#[allow(deprecated)]
use crate::{
Expand Down Expand Up @@ -390,6 +390,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
payload_key: self.config.payload_key.clone(),
use_redis_streams: self.use_redis_streams,
_background_tasks: background_tasks.clone(),
dlq_config: self.config.dlq_config.clone(),
},
RedisConsumer {
redis,
Expand Down Expand Up @@ -421,6 +422,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
payload_key: self.config.payload_key,
use_redis_streams: self.use_redis_streams,
_background_tasks,
dlq_config: self.config.dlq_config,
})
}

Expand All @@ -444,7 +446,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
payload_key: self.config.payload_key,
use_redis_streams: self.use_redis_streams,
_background_tasks,
dlq_config: self.config.dlq_config.clone(),
dlq_config: self.config.dlq_config,
})
}

Expand Down Expand Up @@ -644,6 +646,7 @@ pub struct RedisProducer<M: ManageConnection> {
payload_key: String,
use_redis_streams: bool,
_background_tasks: Arc<JoinSet<Result<()>>>,
dlq_config: Option<DeadLetterQueueConfig>,
}

impl<R: RedisConnection> RedisProducer<R> {
Expand Down Expand Up @@ -698,11 +701,61 @@ impl<R: RedisConnection> RedisProducer<R> {
let payload = serde_json::to_vec(payload)?;
self.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
const BATCH_SIZE: isize = 50;

let DeadLetterQueueConfig { queue_key: dlq, .. } = self
.dlq_config
.as_ref()
.ok_or(QueueError::Unsupported("Missing DeadLetterQueueConfig"))?;

loop {
let mut conn = self.redis.get().await.map_err(QueueError::generic)?;
let old_payloads: Vec<RawPayload> = conn
.lrange(dlq, 0, BATCH_SIZE)
.await
.map_err(QueueError::generic)?;

if old_payloads.is_empty() {
break;
}

let new_payloads = old_payloads
.iter()
.map(|x| InternalPayload::new(x))
.collect::<Vec<_>>();

if self.use_redis_streams {
streams::add_to_main_queue(
new_payloads,
&self.queue_key,
&self.payload_key,
&mut *conn,
)
.await?;
} else {
// This may fail if messages in the key are not in their original raw format.
fallback::add_to_main_queue(new_payloads, &self.queue_key, &mut *conn).await?;
}

let payload_len = old_payloads.len();
for payload in old_payloads {
let _: () = conn
.lrem(dlq, 1, &payload)
.await
.map_err(QueueError::generic)?;
}
info!("Moved {payload_len} items from deadletter queue to main queue");
}

Ok(())
}
}

impl<R: RedisConnection> crate::QueueProducer for RedisProducer<R> {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl<R: RedisConnection> crate::ScheduledQueueProducer for RedisProducer<R> {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
2 changes: 1 addition & 1 deletion omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ async fn send_to_dlq<R: RedisConnection>(
.get()
.await
.map_err(QueueError::generic)?
.lpush(dlq, &payload)
.rpush(dlq, &payload)
.await
.map_err(QueueError::generic)?;

Expand Down
8 changes: 7 additions & 1 deletion omniqueue/src/backends/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,17 @@ impl SqsProducer {

Ok(())
}

pub async fn redrive_dlq(&self) -> Result<()> {
Err(QueueError::Unsupported(
"redrive_dlq is not supported by SqsBackend",
))
}
}

impl crate::QueueProducer for SqsProducer {
type Payload = String;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);

/// This method is overwritten for the SQS backend to be more efficient
/// than the default of sequentially publishing `payloads`.
Expand Down
9 changes: 9 additions & 0 deletions omniqueue/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ macro_rules! omni_delegate {
Self::send_serde_json_scheduled(self, payload, delay)
}
};
( redrive_dlq ) => {
#[deny(unconditional_recursion)] // method call must defer to an inherent method
svix-onelson marked this conversation as resolved.
Show resolved Hide resolved
fn redrive_dlq(
&self,
) -> impl std::future::Future<Output = Result<()>> + Send {
Self::redrive_dlq(self)
}
};

( $method1:ident, $($rest:ident),* $(,)? ) => {
omni_delegate!($method1);
omni_delegate!($($rest),*);
Expand Down
14 changes: 13 additions & 1 deletion omniqueue/src/queue/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub trait QueueProducer: Send + Sync + Sized {

fn send_raw(&self, payload: &Self::Payload) -> impl Future<Output = Result<()>> + Send;

fn redrive_dlq(&self) -> impl Future<Output = Result<()>> + Send;

/// Send a batch of raw messages.
///
/// The default implementation of this sends the payloads sequentially using
Expand Down Expand Up @@ -97,6 +99,8 @@ pub(crate) trait ErasedQueueProducer: Send + Sync {
&'a self,
payload: &'a [u8],
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;

fn redrive_dlq<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
}

struct DynProducerInner<P> {
Expand All @@ -110,6 +114,10 @@ impl<P: QueueProducer> ErasedQueueProducer for DynProducerInner<P> {
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.send_bytes(payload).await })
}

fn redrive_dlq<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.redrive_dlq().await })
}
}

impl DynProducer {
Expand All @@ -121,9 +129,13 @@ impl DynProducer {
let payload = serde_json::to_vec(payload)?;
self.send_raw(&payload).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
self.0.redrive_dlq().await
}
}

impl crate::QueueProducer for DynProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
9 changes: 8 additions & 1 deletion omniqueue/src/scheduled/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ impl<P: ScheduledQueueProducer> ErasedQueueProducer for DynScheduledProducerInne
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.send_bytes(payload).await })
}
fn redrive_dlq<'a>(&'a self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.inner.redrive_dlq().await })
}
}

impl<P: ScheduledQueueProducer> ErasedScheduledQueueProducer for DynScheduledProducerInner<P> {
Expand Down Expand Up @@ -103,11 +106,15 @@ impl DynScheduledProducer {
let payload = serde_json::to_vec(payload)?;
self.0.send_raw_scheduled(&payload, delay).await
}

pub async fn redrive_dlq(&self) -> Result<()> {
self.0.redrive_dlq().await
}
}

impl crate::QueueProducer for DynScheduledProducer {
type Payload = Vec<u8>;
omni_delegate!(send_raw, send_serde_json);
omni_delegate!(send_raw, send_serde_json, redrive_dlq);
}
impl crate::ScheduledQueueProducer for DynScheduledProducer {
omni_delegate!(send_raw_scheduled, send_serde_json_scheduled);
Expand Down
Loading
Loading