diff --git a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt index 1993d06033..b65289ac62 100644 --- a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt +++ b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueue.kt @@ -100,7 +100,8 @@ class RedisQueue( pool.resource.use { redis -> val fingerprint = message.hash() if (redis.zismember(queueKey, fingerprint)) { - log.warn("Ignoring message as an identical one is already on the queue: $fingerprint, message: $message") + log.warn("Re-prioritizing message as an identical one is already on the queue: $fingerprint, message: $message") + redis.zadd(queueKey, score(delay), fingerprint) fire(message) } else { redis.queueMessage(message, delay) @@ -135,10 +136,11 @@ class RedisQueue( redis .multi { zrem(unackedKey, fingerprint) + zadd(queueKey, score(), fingerprint) // we only need to read the message for metrics purposes hget(messagesKey, fingerprint) } - .let { (_, json) -> + .let { (_, _, json) -> mapper .readValue(json as String) .let { message -> diff --git a/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueueTest.kt b/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueueTest.kt index f01dd8ecc5..17c60fcc6e 100644 --- a/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueueTest.kt +++ b/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/RedisQueueTest.kt @@ -29,7 +29,6 @@ object RedisQueueTest : QueueTest(createQueue(p3 = null), ::shutdown object RedisMonitorableQueueTest : MonitorableQueueTest( createQueue, - RedisQueue::retry, ::shutdownCallback ) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt index 1dba8e6607..0d88040d9c 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueue.kt @@ -62,11 +62,12 @@ class InMemoryQueue( } override fun push(message: Message, delay: TemporalAmount) { - if (queue.none { it.payload == message }) { - queue.put(Envelope(message, clock.instant().plus(delay), clock)) - fire(message) - } else { + val existed = queue.removeIf { it.payload == message } + queue.put(Envelope(message, clock.instant().plus(delay), clock)) + if (existed) { fire(message) + } else { + fire(message) } } @@ -79,12 +80,13 @@ class InMemoryQueue( deadMessageHandler.invoke(this, message.payload) fire() } else { - if (queue.none { it.payload == message.payload }) { - log.warn("redelivering unacked message ${message.payload}") - queue.put(message.copy(scheduledTime = now, count = message.count + 1)) - fire() - } else { + val existed = queue.removeIf { it.payload == message.payload } + log.warn("redelivering unacked message ${message.payload}") + queue.put(message.copy(scheduledTime = now, count = message.count + 1)) + if (existed) { fire(message.payload) + } else { + fire() } } } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt index b1b4ca1991..e701fedd6a 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueTest.kt @@ -26,6 +26,7 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.threeten.extra.Hours import java.io.Closeable import java.time.Clock import java.time.Duration @@ -105,7 +106,7 @@ abstract class QueueTest( afterGroup(::resetMocks) on("polling the queue twice") { - queue!!.apply { + with(queue!!) { poll(callback) poll(callback) } @@ -169,7 +170,7 @@ abstract class QueueTest( beforeGroup { queue = createQueue(clock, deadLetterCallback) - queue!!.apply { + with(queue!!) { push(message) poll { _, ack -> ack() @@ -181,7 +182,7 @@ abstract class QueueTest( afterGroup(::resetMocks) on("polling the queue after the message acknowledgment has timed out") { - queue!!.apply { + with(queue!!) { clock.incrementBy(ackTimeout) retry() poll(callback) @@ -198,7 +199,7 @@ abstract class QueueTest( beforeGroup { queue = createQueue(clock, deadLetterCallback) - queue!!.apply { + with(queue!!) { push(message) poll { _, _ -> } } @@ -208,7 +209,7 @@ abstract class QueueTest( afterGroup(::resetMocks) on("polling the queue after the message acknowledgment has timed out") { - queue!!.apply { + with(queue!!) { clock.incrementBy(ackTimeout) retry() poll(callback) @@ -225,7 +226,7 @@ abstract class QueueTest( beforeGroup { queue = createQueue(clock, deadLetterCallback) - queue!!.apply { + with(queue!!) { push(message) repeat(2) { poll { _, _ -> } @@ -239,7 +240,7 @@ abstract class QueueTest( afterGroup(::resetMocks) on("polling the queue again") { - queue!!.apply { + with(queue!!) { poll(callback) } } @@ -254,7 +255,7 @@ abstract class QueueTest( beforeGroup { queue = createQueue(clock, deadLetterCallback) - queue!!.apply { + with(queue!!) { push(message) repeat(maxRetries) { poll { _, _ -> } @@ -268,7 +269,7 @@ abstract class QueueTest( afterGroup(::resetMocks) on("polling the queue again") { - queue!!.apply { + with(queue!!) { poll(callback) } } @@ -283,7 +284,7 @@ abstract class QueueTest( and("the message has been dead-lettered") { on("the next time retry checks happen") { - queue!!.apply { + with(queue!!) { retry() poll(callback) } @@ -304,6 +305,34 @@ abstract class QueueTest( given("a message was pushed") { val message = StartExecution(Pipeline::class.java, "1", "foo") + and("a duplicate is pushed with a newer delivery time") { + val delay = Hours.of(1) + + beforeGroup { + queue = createQueue(clock, deadLetterCallback).apply { + push(message, delay) + push(message.copy()) + } + } + + afterGroup(::stopQueue) + afterGroup(::resetMocks) + + on("polling the queue") { + queue!!.poll(callback) + } + + it("delivers the message immediately and only once") { + verify(callback).invoke(eq(message), any()) + } + + it("does not hold on to the first message") { + clock.incrementBy(delay) + queue!!.poll(callback) + verifyNoMoreInteractions(callback) + } + } + and("a different message is pushed before acknowledging the first") { val newMessage = message.copy(executionId = "2") @@ -374,6 +403,40 @@ abstract class QueueTest( } } + and("another identical message is pushed with a delay and the first is never acknowledged") { + val delay = Hours.of(1) + + beforeGroup { + queue = createQueue(clock, deadLetterCallback).apply { + push(message) + poll { _, ack -> + push(message.copy(), delay) + } + } + } + + afterGroup(::stopQueue) + afterGroup(::resetMocks) + + on("polling the queue again after the first message times out") { + with(queue!!) { + clock.incrementBy(ackTimeout) + retry() + poll(callback) + } + } + + it("re-queued the message for immediate delivery") { + verify(callback).invoke(eq(message), any()) + } + + it("discarded the delayed message") { + clock.incrementBy(delay) + queue!!.poll(callback) + verifyNoMoreInteractions(callback) + } + } + and("another identical message is pushed after acknowledging the first") { beforeGroup { queue = createQueue(clock, deadLetterCallback).apply { diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueueTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueueTest.kt index 11296ba75f..01c2c5bebb 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueueTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/memory/InMemoryQueueTest.kt @@ -27,8 +27,7 @@ import java.time.Clock object InMemoryQueueTest : QueueTest(createQueue(p3 = null)) object InMemoryMonitorableQueueTest : MonitorableQueueTest( - createQueue, - InMemoryQueue::retry + createQueue ) private val createQueue = { diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueueTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueueTest.kt index 8682fcaec2..d482d3c4d0 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueueTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/metrics/MonitorableQueueTest.kt @@ -35,7 +35,6 @@ import java.time.Duration abstract class MonitorableQueueTest( createQueue: (Clock, DeadMessageCallback, ApplicationEventPublisher?) -> Q, - triggerRedeliveryCheck: Q.() -> Unit, shutdownCallback: (() -> Unit)? = null ) : Spek({ @@ -216,8 +215,10 @@ abstract class MonitorableQueueTest( } on("checking for unacknowledged messages") { - clock.incrementBy(queue!!.ackTimeout) - triggerRedeliveryCheck.invoke(queue!!) + with(queue!!) { + clock.incrementBy(ackTimeout) + retry() + } } it("fires an event") { @@ -236,8 +237,10 @@ abstract class MonitorableQueueTest( } on("checking for unacknowledged messages") { - clock.incrementBy(queue!!.ackTimeout) - triggerRedeliveryCheck.invoke(queue!!) + with(queue!!) { + clock.incrementBy(ackTimeout) + retry() + } } it("fires an event indicating the message is being retried") { @@ -269,8 +272,10 @@ abstract class MonitorableQueueTest( } on("checking for unacknowledged messages") { - clock.incrementBy(queue!!.ackTimeout) - triggerRedeliveryCheck.invoke(queue!!) + with(queue!!) { + clock.incrementBy(ackTimeout) + retry() + } } it("fires an event indicating the message is a duplicate") { @@ -297,9 +302,11 @@ abstract class MonitorableQueueTest( on("failing to acknowledge the message ${Queue.maxRetries} times") { (1..Queue.maxRetries).forEach { - queue!!.poll { _, _ -> } - clock.incrementBy(queue!!.ackTimeout) - triggerRedeliveryCheck.invoke(queue!!) + with(queue!!) { + poll { _, _ -> } + clock.incrementBy(ackTimeout) + retry() + } } }