Skip to content

Commit

Permalink
Fix rabbitmq container deprecations and test code cleanup. (#619)
Browse files Browse the repository at this point in the history
* Refactor mutable static state.

* Use RabbitMQContainer API to replace deprecated use.

* Use latest official rabbitmq Docker container.

* Share rabbit version between tests

* nit: formatting

* Use RabbitContainer and abhor statics

* Don't use Hamcrest

* polling conditions need asserts

* Fix tests for rabbit 3.9+ and improve logging a bit

* Remove deprecated container and use same rabbit version

* Use the same rabbit version for all tests

---------

Co-authored-by: Tim Yates <[email protected]>
  • Loading branch information
wetted and timyates authored Apr 16, 2024
1 parent c072219 commit 3e06a61
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,27 @@ abstract class AbstractRabbitMQClusterTest extends Specification {
private static final Logger log = LoggerFactory.getLogger(AbstractRabbitMQClusterTest)

private static final int AMQP_PORT = 5672
private static final DockerImageName RABBIT_IMAGE = DockerImageName.parse("rabbitmq:3.8-management")
private static final String CLUSTER_COOKIE = "test-cluster"
private static final String RABBIT_CONFIG_PATH = ClassLoader.getSystemResource("rabbit/rabbitmq.conf").path
private static final String RABBIT_DEFINITIONS_PATH = ClassLoader.getSystemResource("rabbit/definitions.json").path
private static final String DOCKER_IMAGE_NAME = "rabbitmq:$AbstractRabbitMQTest.RABBIT_CONTAINER_VERSION-management"

protected ApplicationContext applicationContext

static final String EXCHANGE = "test-exchange"
static final String QUEUE = "test-durable-queue"

@Shared
@AutoCleanup
Network mqClusterNet = Network.newNetwork()

@Shared
@AutoCleanup
RabbitMQContainer node1 = new RabbitMQContainer(RABBIT_IMAGE)
RabbitMQContainer node1 = new RabbitMQContainer(DockerImageName.parse(DOCKER_IMAGE_NAME))

@Shared
@AutoCleanup
RabbitMQContainer node2 = new RabbitMQContainer(RABBIT_IMAGE)
RabbitMQContainer node2 = new RabbitMQContainer(DockerImageName.parse(DOCKER_IMAGE_NAME))

@Shared
@AutoCleanup
RabbitMQContainer node3 = new RabbitMQContainer(RABBIT_IMAGE)
RabbitMQContainer node3 = new RabbitMQContainer(DockerImageName.parse(DOCKER_IMAGE_NAME))

def setupSpec() {
log.info("rabbit.conf path: {}", RABBIT_CONFIG_PATH)
log.info("rabbit definitions path: {}", RABBIT_DEFINITIONS_PATH)

configureContainer(node1, "rabbitmq1")
.waitingFor(Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(1)))
.start()
Expand Down Expand Up @@ -95,7 +86,8 @@ abstract class AbstractRabbitMQClusterTest extends Specification {
"spec.name" : getClass().simpleName,
"rabbitmq.servers.node1.port": node1Port,
"rabbitmq.servers.node2.port": node2Port,
"rabbitmq.servers.node3.port": node3Port] << additionalConfig
"rabbitmq.servers.node3.port": node3Port
] << additionalConfig

log.info("context properties: {}", properties)
applicationContext = ApplicationContext.run(properties, "test")
Expand All @@ -106,10 +98,16 @@ abstract class AbstractRabbitMQClusterTest extends Specification {
}

private configureContainer(RabbitMQContainer mqContainer, String hostname) {
String rabbitConfigPath = ClassLoader.getSystemResource("rabbit/rabbitmq.conf").path
log.info("rabbit.conf path: {}", rabbitConfigPath)

String rabbitDefinitionsPath = ClassLoader.getSystemResource("rabbit/definitions.json").path
log.info("rabbit definitions.json path: {}", rabbitDefinitionsPath)

mqContainer
.withEnv("RABBITMQ_ERLANG_COOKIE", CLUSTER_COOKIE)
.withCopyFileToContainer(forHostPath(RABBIT_CONFIG_PATH), "/etc/rabbitmq/rabbitmq.conf")
.withCopyFileToContainer(forHostPath(RABBIT_DEFINITIONS_PATH), "/etc/rabbitmq/definitions.json")
.withEnv("RABBITMQ_ERLANG_COOKIE", "test-cluster")
.withRabbitMQConfigSysctl(forHostPath(rabbitConfigPath))
.withCopyFileToContainer(forHostPath(rabbitDefinitionsPath), "/etc/rabbitmq/definitions.json")
.withNetwork(mqClusterNet)
.withLogConsumer(new Slf4jLogConsumer(log).withPrefix(hostname))
.withCreateContainerCmdModifier(cmd -> cmd
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package io.micronaut.rabbitmq

import io.micronaut.context.ApplicationContext
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy
import org.testcontainers.containers.RabbitMQContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

abstract class AbstractRabbitMQTest extends Specification {

static GenericContainer rabbitContainer =
new GenericContainer("library/rabbitmq:3.7")
.withExposedPorts(5672)
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*Server startup complete.*"))
static String RABBIT_CONTAINER_VERSION = "3.13.1"

static {
@Shared
@AutoCleanup
RabbitMQContainer rabbitContainer = new RabbitMQContainer("rabbitmq:" + RABBIT_CONTAINER_VERSION)

def setupSpec() {
rabbitContainer.start()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package io.micronaut.rabbitmq.connect
import com.rabbitmq.client.Connection
import io.micronaut.context.ApplicationContext
import io.micronaut.inject.qualifiers.Qualifiers
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy
import io.micronaut.rabbitmq.AbstractRabbitMQTest
import org.testcontainers.containers.RabbitMQContainer
import spock.lang.Specification

import java.time.Duration
Expand All @@ -13,24 +13,21 @@ class MultipleServersSpec extends Specification {

void "test multiple server configuration"() {
given:
GenericContainer rabbit1 = new GenericContainer("library/rabbitmq:3.7")
.withExposedPorts(5672)
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*Server startup complete.*"))
GenericContainer rabbit2 = new GenericContainer("library/rabbitmq:3.7")
.withExposedPorts(5672)
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*Server startup complete.*"))
RabbitMQContainer rabbit1 = new RabbitMQContainer("rabbitmq:" + AbstractRabbitMQTest.RABBIT_CONTAINER_VERSION)
RabbitMQContainer rabbit2 = new RabbitMQContainer("rabbitmq:" + AbstractRabbitMQTest.RABBIT_CONTAINER_VERSION)

when:
rabbit1.start()
rabbit2.start()
ApplicationContext context = ApplicationContext.run(
["spec.name": getClass().simpleName,
"rabbitmq.servers.one.uri": "amqp://localhost:${rabbit1.getMappedPort(5672)}",
"rabbitmq.servers.one.channel-pool.max-idle-channels": "10",
"rabbitmq.servers.one.rpc.timeout": "10s",
"rabbitmq.servers.two.uri": "amqp://localhost:${rabbit2.getMappedPort(5672)}",
"rabbitmq.servers.two.channel-pool.max-idle-channels": "20",
"rabbitmq.servers.two.rpc.timeout": "20s"])
ApplicationContext context = ApplicationContext.run([
"spec.name" : getClass().simpleName,
"rabbitmq.servers.one.uri" : "amqp://localhost:${rabbit1.getMappedPort(5672)}",
"rabbitmq.servers.one.channel-pool.max-idle-channels": "10",
"rabbitmq.servers.one.rpc.timeout" : "10s",
"rabbitmq.servers.two.uri" : "amqp://localhost:${rabbit2.getMappedPort(5672)}",
"rabbitmq.servers.two.channel-pool.max-idle-channels": "20",
"rabbitmq.servers.two.rpc.timeout" : "20s"
])

then:
context.getBean(RabbitConnectionFactoryConfig, Qualifiers.byName("one")).port == rabbit1.getMappedPort(5672)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,31 @@ import com.rabbitmq.client.ShutdownListener
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.core.io.socket.SocketUtils
import io.micronaut.rabbitmq.AbstractRabbitMQTest
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitClient
import io.micronaut.rabbitmq.annotation.RabbitListener
import io.micronaut.rabbitmq.connect.recovery.TemporarilyDownException
import io.micronaut.rabbitmq.connect.recovery.TemporarilyDownIOException
import io.micronaut.rabbitmq.connect.recovery.TemporarilyDownRuntimeException
import io.micronaut.rabbitmq.exception.RabbitClientException
import io.micronaut.rabbitmq.exception.RabbitListenerException
import io.micronaut.rabbitmq.exception.RabbitListenerExceptionHandler
import org.testcontainers.containers.FixedHostPortGenericContainer
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy
import org.testcontainers.containers.RabbitMQContainer
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

class TemporarilyDownConsumersSpec extends Specification {

static PollingConditions conditions = new PollingConditions(timeout: 10)
PollingConditions conditions = new PollingConditions(timeout: 10)

void "test temporarily down consumer"() {
given:
int port = SocketUtils.findAvailableTcpPort()
FixedHostPortGenericContainer rabbitmq = new FixedHostPortGenericContainer("library/rabbitmq:3.7")
.withFixedExposedPort(port,5672)
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*Server startup complete.*"))
RabbitMQContainer rabbitmq = new RabbitMQContainer("rabbitmq:" + AbstractRabbitMQTest.RABBIT_CONTAINER_VERSION).tap {
portBindings = ["$port:5672".toString()]
}

ApplicationContext applicationContext = ApplicationContext.run(["rabbitmq.port": port, "spec.name": "TemporarilyDownConsumersSpec"], "test")

when: "consumer is instantiated when the server is down"
Expand Down Expand Up @@ -56,23 +57,24 @@ class TemporarilyDownConsumersSpec extends Specification {
}

void "test temporarily down producer"() {
given:
int port = SocketUtils.findAvailableTcpPort()
FixedHostPortGenericContainer rabbitmq = new FixedHostPortGenericContainer("library/rabbitmq:3.7")
.withFixedExposedPort(port,5672)
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*Server startup complete.*"))
RabbitMQContainer rabbitmq = new RabbitMQContainer("rabbitmq:" + AbstractRabbitMQTest.RABBIT_CONTAINER_VERSION).tap {
portBindings = ["$port:5672".toString()]
}
ApplicationContext applicationContext = ApplicationContext.run(["rabbitmq.port": port, "spec.name": "TemporarilyDownConsumersSpec"], "test")

when: "producer publishes a message when the server is down"
applicationContext.getBean(MyProducer).send("hello")

then: "a temporarily down exception is thrown"
var exception = thrown(RabbitClientException)
def exception = thrown(RabbitClientException)
exception.message == 'Failed to publish a message with exchange: [] routing key [eventually-up] and mandatory flag [false]'
exception.cause.message == 'Failed to retrieve a channel from the pool'
exception.cause.cause.message == 'Connection is not ready yet'

when: "the temporarily down connection is retrieved from the exception"
var connection = ((TemporarilyDownException) exception.cause.cause).connection
def connection = exception.cause.cause.connection
then: "the connection is still down"
connection.stillDown

Expand Down Expand Up @@ -240,7 +242,7 @@ class TemporarilyDownConsumersSpec extends Specification {
then: "details can be retrieved from connection"
connection.check()
wasNotified
connection.stillDown == false
!connection.stillDown
connection.address.hostName == 'localhost'
connection.port == port
connection.channelMax == 2047
Expand Down Expand Up @@ -278,7 +280,7 @@ class TemporarilyDownConsumersSpec extends Specification {
when: "close connection"
connection.close(-1, 'testing')
then: "we can retrieve details"
connection.open == false
!connection.open
connection.closeReason.reason.replyText == 'testing'

when: "abort connection"
Expand All @@ -297,6 +299,7 @@ class TemporarilyDownConsumersSpec extends Specification {
@Requires(property = "spec.name", value = "TemporarilyDownConsumersSpec")
@RabbitClient
static interface MyProducer {

@Binding("eventually-up")
void send(String message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RabbitHealthIndicatorSpec extends AbstractRabbitMQTest {

then:
result.status == UP
((Map<String, Object>) result.details).version.toString().startsWith("3.7")
((Map<String, Object>) result.details).version.toString() == RABBIT_CONTAINER_VERSION
}

void "test rabbitmq health indicator with 2 connections"() {
Expand All @@ -37,8 +37,8 @@ class RabbitHealthIndicatorSpec extends AbstractRabbitMQTest {
then:
result.status == UP
Map<String, List> details = result.details
details.get("connections")[0].get("version").toString().startsWith("3.7")
details.get("connections")[1].get("version").toString().startsWith("3.7")
details.get("connections")[0].get("version").toString() == RABBIT_CONTAINER_VERSION
details.get("connections")[1].get("version").toString() == RABBIT_CONTAINER_VERSION
}

void "test rabbitmq health indicator shows down"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeoutException

import static java.util.concurrent.TimeUnit.MILLISECONDS
import static org.hamcrest.MatcherAssert.assertThat
import static org.hamcrest.Matchers.equalTo

class ConsumerRecoverySpec extends AbstractRabbitMQClusterTest {

private static final Logger log = LoggerFactory.getLogger(ConsumerRecoverySpec)

static final String EXCHANGE = "test-exchange"
static final String QUEUE = "test-durable-queue"

@Shared
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()

Expand All @@ -42,7 +43,7 @@ class ConsumerRecoverySpec extends AbstractRabbitMQClusterTest {

def setupSpec() {
/*
* The current Micronaut publisher implementation has a flaw in detecting unroutable drop/return messages
* The current Micronaut publisher implementation has a flaw in detecting non-routable drop/return messages
* in a Rabbit cluster setup. It considers the messages as published even if the broker did not enqueue it.
* So for this test a simple custom publisher is used that detects unpublished messages.
*/
Expand Down Expand Up @@ -74,12 +75,17 @@ class ConsumerRecoverySpec extends AbstractRabbitMQClusterTest {
ch.basicPublish(EXCHANGE, "", true,
new AMQP.BasicProperties.Builder().deliveryMode(2).build(),
msg.bytes)
// Since rabbit 3.9.x this does not seem to throw an exception in these tests
if (ch.waitForConfirms(1000)) {
log.info("publish ack")
log.info("publish ack {}", msg)
} else {
// If the message is not confirmed, it is considered not published
publishedMessages.remove(msg)
log.warn("publish nack {}", msg)
}
} catch (IOException | RuntimeException | InterruptedException | TimeoutException e) {
publishedMessages.remove(msg)
log.error("failed to publish: {}", e.message)
log.error("failed to publish {}: {}", msg, e.message)
}
}, 500, 500, MILLISECONDS))
}
Expand Down Expand Up @@ -208,8 +214,7 @@ class ConsumerRecoverySpec extends AbstractRabbitMQClusterTest {
enablePublisher = false

new PollingConditions(timeout: 60).eventually {
assertThat "all published messages must be consumed",
publishedMessages, equalTo(consumer.consumedMessages)
assert publishedMessages == consumer.consumedMessages

Check failure on line 217 in rabbitmq/src/test/groovy/io/micronaut/rabbitmq/listener/ConsumerRecoverySpec.groovy

View workflow job for this annotation

GitHub Actions / Java CI / Test Report (21)

ConsumerRecoverySpec.test restart of Node2 with consumer connected to Node2

Condition not satisfied after 60.00 seconds and 607 attempts
Raw output
Condition not satisfied after 60.00 seconds and 607 attempts
	at app//spock.util.concurrent.PollingConditions.within(PollingConditions.java:185)
	at app//spock.util.concurrent.PollingConditions.eventually(PollingConditions.java:140)
	at app//io.micronaut.rabbitmq.listener.ConsumerRecoverySpec.stopPublishAndAssertAllConsumed(ConsumerRecoverySpec.groovy:216)
	at io.micronaut.rabbitmq.listener.ConsumerRecoverySpec.test restart of Node2 with consumer connected to Node2(ConsumerRecoverySpec.groovy:155)
Caused by: Condition not satisfied:

publishedMessages == consumer.consumedMessages
|                 |  |        |
|                 |  |        [f71f870e-1d07-4256-bfd4-9819b272ceed, 4de5bd42-198e-4260-b05a-5c1d03c25286, 4d5bc207-35f1-47dc-bafc-7a2ecbef43d3, 7177602a-175e-4084-9e85-f0245fb0d347, 0420b1ef-6c9a-483f-9bd5-fd2411e29f80, 3f0a24a9-71a7-4f3c-bf61-33deaa0dd984, dd8426ff-17e4-49c5-8eb1-85d94af757d9, 380374cf-de67-455a-b3f9-e020de92499b, c60830ce-1087-4916-886b-93569afadbbc, f60e8a27-45b5-4e23-bbbd-b8cc87d91d56, 96712ace-a63e-47f8-840b-50b490a51061, 40896bf5-3286-40ae-8955-ab504ae73aea, 5ec7a429-55ce-413c-a946-28c5ce04454d, 1e391dfa-264b-46ff-8f16-f049aa233e4b, 38accac2-44ea-4b5a-bcb9-f06a3c3e75dc, 0a51aa08-07a0-48cf-aa0d-4ee7850415d6, b2e9a48f-f63b-41b7-b0f4-2d8ab98248f3, e926d823-7e4e-4e94-9762-4d522fcac865, b2a36e1c-2038-4854-8d1e-478a28b57a7d, 2ab56db0-23d1-48f6-8f98-b13c14ce4735, fd0c4e64-3914-415d-8f79-7552dad84522, 0c57ebe0-71f2-46f8-b296-ac5cc20a7702, bde0650f-89b1-4592-84da-a6c6e8e873ee, b7f96afe-77d4-4874-9d15-b7370d1c2944, bda8ed5a-c0e5-4b40-9a0b-9626a3531f07, 618f9c34-8936-4ebc-a3b1-e5c3afa8584b, 94ab636d-374f-4376-b5f7-b65f89952ca6, 09ab6dad-7f52-41be-ad04-1eac574e03a2, 824ea33b-b669-4bd2-b297-9d4c15df31e8, 87d882fe-bcf2-4ddb-a048-f0c6d7896ecf, 7c923683-07d7-4bea-9999-7ce4b32b5606, 63a0824a-1b21-47e4-a9b5-6880945b11b3, b527be92-31b1-49bf-a5d5-d8c07dd649af, 4df686bc-d25c-422e-979f-2cbffcd84646, 286ff6d4-24af-4d61-8f3b-3fa610a6c769]
|                 |  <io.micronaut.rabbitmq.listener.ConsumerRecoverySpec$Node2Consumer@2be0ea4a consumedMessages=[f71f870e-1d07-4256-bfd4-9819b272ceed, 4de5bd42-198e-4260-b05a-5c1d03c25286, 4d5bc207-35f1-47dc-bafc-7a2ecbef43d3, 7177602a-175e-4084-9e85-f0245fb0d347, 0420b1ef-6c9a-483f-9bd5-fd2411e29f80, 3f0a24a9-71a7-4f3c-bf61-33deaa0dd984, dd8426ff-17e4-49c5-8eb1-85d94af757d9, 380374cf-de67-455a-b3f9-e020de92499b, c60830ce-1087-4916-886b-93569afadbbc, f60e8a27-45b5-4e23-bbbd-b8cc87d91d56, 96712ace-a63e-47f8-840b-50b490a51061, 40896bf5-3286-40ae-8955-ab504ae73aea, 5ec7a429-55ce-413c-a946-28c5ce04454d, 1e391dfa-264b-46ff-8f16-f049aa233e4b, 38accac2-44ea-4b5a-bcb9-f06a3c3e75dc, 0a51aa08-07a0-48cf-aa0d-4ee7850415d6, b2e9a48f-f63b-41b7-b0f4-2d8ab98248f3, e926d823-7e4e-4e94-9762-4d522fcac865, b2a36e1c-2038-4854-8d1e-478a28b57a7d, 2ab56db0-23d1-48f6-8f98-b13c14ce4735, fd0c4e64-3914-415d-8f79-7552dad84522, 0c57ebe0-71f2-46f8-b296-ac5cc20a7702, bde0650f-89b1-4592-84da-a6c6e8e873ee, b7f96afe-77d4-4874-9d15-b7370d1c2944, bda8ed5a-c0e5-4b40-9a0b-9626a3531f07, 618f9c34-8936-4ebc-a3b1-e5c3afa8584b, 94ab636d-374f-4376-b5f7-b65f89952ca6, 09ab6dad-7f52-41be-ad04-1eac574e03a2, 824ea33b-b669-4bd2-b297-9d4c15df31e8, 87d882fe-bcf2-4ddb-a048-f0c6d7896ecf, 7c923683-07d7-4bea-9999-7ce4b32b5606, 63a0824a-1b21-47e4-a9b5-6880945b11b3, b527be92-31b1-49bf-a5d5-d8c07dd649af, 4df686bc-d25c-422e-979f-2cbffcd84646, 286ff6d4-24af-4d61-8f3b-3fa610a6c769] lastException=null>
|                 false
|                 3 differences (91% similarity, 3 missing, 0 extra)
|                 missing: [b2e9a48f-f63b-41b7-b0f4-2d8ab98248f3, e926d823-7e4e-4e94-9762-4d522fcac865, 87d882fe-bcf2-4ddb-a048-f0c6d7896ecf]
|                 extra: []
[f71f870e-1d07-4256-bfd4-9819b272ceed, 4de5bd42-198e-4260-b05a-5c1d03c25286, 4d5bc207-35f1-47dc-bafc-7a2ecbef43d3, 7177602a-175e-4084-9e85-f0245fb0d347, 0420b1ef-6c9a-483f-9bd5-fd2411e29f80, 3f0a24a9-71a7-4f3c-bf61-33deaa0dd984, dd8426ff-17e4-49c5-8eb1-85d94af757d9, 380374cf-de67-455a-b3f9-e020de92499b, c60830ce-1087-4916-886b-93569afadbbc, f60e8a27-45b5-4e23-bbbd-b8cc87d91d56, 96712ace-a63e-47f8-840b-50b490a51061, 40896bf5-3286-40ae-8955-ab504ae73aea, 5ec7a429-55ce-413c-a946-28c5ce04454d, 1e391dfa-264b-46ff-8f16-f049aa233e4b, 38accac2-44ea-4b5a-bcb9-f06a3c3e75dc, 0a51aa08-07a0-48cf-aa0d-4ee7850415d6, b2a36e1c-2038-4854-8d1e-478a28b57a7d, 2ab56db0-23d1-48f6-8f98-b13c14ce4735, fd0c4e64-3914-415d-8f79-7552dad84522, 0c57ebe0-71f2-46f8-b296-ac5cc20a7702, bde0650f-89b1-4592-84da-a6c6e8e873ee, b7f96afe-77d4-4874-9d15-b7370d1c2944, bda8ed5a-c0e5-4b40-9a0b-9626a3531f07, 618f9c34-8936-4ebc-a3b1-e5c3afa8584b, 94ab636d-374f-4376-b5f7-b65f89952ca6, 09ab6dad-7f52-41be-ad04-1eac574e03a2, 824ea33b-b669-4bd2-b297-9d4c15df31e8, 7c923683-07d7-4bea-9999-7ce4b32b5606, 63a0824a-1b21-47e4-a9b5-6880945b11b3, b527be92-31b1-49bf-a5d5-d8c07dd649af, 4df686bc-d25c-422e-979f-2cbffcd84646, 286ff6d4-24af-4d61-8f3b-3fa610a6c769]

	at io.micronaut.rabbitmq.listener.ConsumerRecoverySpec.stopPublishAndAssertAllConsumed_closure4(ConsumerRecoverySpec.groovy:217)
	at io.micronaut.rabbitmq.listener.ConsumerRecoverySpec.stopPublishAndAssertAllConsumed_closure4(ConsumerRecoverySpec.groovy)
	at app//groovy.lang.Closure.call(Closure.java:433)
	at app//spock.util.concurrent.PollingConditions.within(PollingConditions.java:168)
	... 3 more
}
}

Expand All @@ -219,7 +224,7 @@ class ConsumerRecoverySpec extends AbstractRabbitMQClusterTest {
container.execInContainer("rabbitmqctl", "start_app")

new PollingConditions(timeout: 60).eventually {
container.isHealthy()
assert container.isHealthy()
}
log.info("started container: {}", container.containerId)
}
Expand All @@ -232,7 +237,7 @@ class ConsumerRecoverySpec extends AbstractRabbitMQClusterTest {

RabbitListenerException lastException

@Queue(AbstractRabbitMQClusterTest.QUEUE)
@Queue(QUEUE)
void handleMessage(@MessageBody String body) {
consumedMessages << body
log.info("{} received: {}", consumedMessages.size(), body)
Expand Down

0 comments on commit 3e06a61

Please sign in to comment.