Fix RabbitMQ modules for removed own-connection

* Clean up Testcontainers dependencies in the RabbitMQ modules
* Use `RabbitMQContainer` in the `RabbitSourceListenerTests`
This commit is contained in:
Artem Bilan
2024-03-11 09:52:18 -04:00
parent e9ad87a1fd
commit 2880f3c00a
5 changed files with 23 additions and 146 deletions

View File

@@ -28,23 +28,15 @@
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-rabbit-consumer</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@@ -1,54 +0,0 @@
/*
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.sink.rabbit;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.test.context.TestPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
* @author Chris Bono
*/
@TestPropertySource(properties = {"rabbit.consumer.routingKey=scsapp-testOwn",
"rabbit.consumer.own-connection=true"})
public class OwnConnectionTest extends RabbitSinkIntegrationTests {
@Test
public void test() {
this.rabbitAdmin.declareQueue(
new Queue("scsapp-testOwn", false, false, true));
// Destroy the boot connection factory - should not matter to outbound adapter as it SHOULD be using its own connection factory
this.bootFactory.destroy();
assertThat(this.bootFactory.getCacheProperties().getProperty("localPort")).isEqualTo("0");
// Send to the channel - should still be consumed by outbound adapter as its using its own connection factory
this.channels.send(MessageBuilder.withPayload("foo".getBytes()).build());
// RabbitTemplate also using its own connection factory - should still be able to get the message
assertThat(this.rabbitTemplate.getConnectionFactory()).isNotSameAs(bootFactory);
this.rabbitTemplate.setReceiveTimeout(10000);
Message received = this.rabbitTemplate.receive("scsapp-testOwn");
assertThat(new String(received.getBody())).isEqualTo("foo");
}
}

View File

@@ -19,21 +19,22 @@ package org.springframework.cloud.stream.app.sink.rabbit;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
/**
* Provides a static RabbitMQ Container that can be shared across test classes.
*
* @author Chris Bono
* @author Artem Bilan
*/
@Testcontainers(disabledWithoutDocker = true)
public interface RabbitMqTestContainerSupport {
GenericContainer<?> RABBIT_MQ_CONTAINER = new GenericContainer<>("rabbitmq:3.10")
.withExposedPorts(5672)
.withStartupTimeout(Duration.ofSeconds(120))
.withStartupAttempts(3);
RabbitMQContainer RABBIT_MQ_CONTAINER = new RabbitMQContainer("rabbitmq:management")
.withExposedPorts(5672)
.withStartupTimeout(Duration.ofSeconds(120))
.withStartupAttempts(3);
@BeforeAll
static void startContainer() {
@@ -43,4 +44,5 @@ public interface RabbitMqTestContainerSupport {
static Integer getPort() {
return RABBIT_MQ_CONTAINER.getFirstMappedPort();
}
}

View File

@@ -24,16 +24,9 @@
<artifactId>spring-boot-starter-json</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -16,15 +16,12 @@
package org.springframework.cloud.stream.app.source.rabbit;
import java.util.HashMap;
import org.aopalliance.aop.Advice;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.AmqpHeaders;
@@ -48,20 +45,26 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Gary Russell
* @author Chris Schaefer
* @author Corneil du Plessis
* @author Artem Bilan
*/
@Tag("integration")
public class RabbitSourceListenerTests {
static {
RabbitMQContainer rabbitmq = new RabbitMQContainer("rabbitmq:3.7-management-alpine")
.withQueue("scsapp-testq", false, false, new HashMap<>())
.withQueue("scsapp-testq2", false, false, new HashMap<>())
.withQueue("scsapp-testOwnSource", false, false, new HashMap<>())
.withExchange("scsapp-testex", "fanout")
.withBinding("scsapp-testex", "scsapp-testq");
RabbitMQContainer rabbitmq = new RabbitMQContainer("rabbitmq:management");
rabbitmq.start();
System.setProperty("spring.rabbitmq.test.port", rabbitmq.getAmqpPort().toString());
try {
rabbitmq.execInContainer("rabbitmqadmin", "declare", "queue", "name=scsapp-testq", "auto_delete=false", "durable=false");
rabbitmq.execInContainer("rabbitmqadmin", "declare", "queue", "name=scsapp-testq2", "auto_delete=false", "durable=false");
rabbitmq.execInContainer("rabbitmqadmin", "declare", "exchange", "name=scsapp-testex", "type=fanout");
rabbitmq.execInContainer("rabbitmqadmin", "declare", "binding", "source=scsapp-testex", "destination=scsapp-testq");
}
catch (Exception ex) {
throw new IllegalStateException(ex);
}
System.setProperty("spring.rabbitmq.port", rabbitmq.getAmqpPort().toString());
}
@Test
@@ -78,9 +81,7 @@ public class RabbitSourceListenerTests {
"--spring.rabbitmq.listener.simple.acknowledgeMode=AUTO",
"--spring.rabbitmq.listener.simple.prefetch=10",
"--spring.rabbitmq.listener.simple.transactionSize=5",
"--spring.cloud.stream.function.bindings.rabbitSupplier-out-0=output",
"--spring.rabbitmq.port=" +
"${spring.rabbitmq.test.port}"
"--spring.cloud.stream.function.bindings.rabbitSupplier-out-0=output"
)) {
final RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
@@ -94,62 +95,6 @@ public class RabbitSourceListenerTests {
}
}
@Test
public void testOwnConnection() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(RabbitSourceTestApplication.class))
.web(WebApplicationType.NONE)
.run("--spring.cloud.function.definition=rabbitSupplier",
"--rabbit.supplier.queues=scsapp-testOwnSource",
"--rabbit.supplier.enableRetry=false",
"--rabbit.supplier.own-connection=true",
"--spring.rabbitmq.port=" +
"${spring.rabbitmq.test.port}"
)) {
// Reset the boot connection factory -should not matter to container as it SHOULD be using its own connection factory
final CachingConnectionFactory bootFactory = context.getBean(CachingConnectionFactory.class);
bootFactory.resetConnection();
final OutputDestination target = context.getBean(OutputDestination.class);
// Send a message on a separate connection - the container should still receive it.
sendMessageOnSeparateConnection("scsapp-testOwnSource", "foo", bootFactory);
Message<byte[]> sourceMessage = target.receive(600000, "rabbitSupplier-out-0");
final String actual = new String(sourceMessage.getPayload());
assertThat(actual).isEqualTo("foo");
assertThat(bootFactory.getCacheProperties().getProperty("localPort")).isEqualTo("0");
}
}
/**
* Sends a message on a separate connection.
*
* @param routingKey message routing key
* @param payload message content
* @param bootFactory the auto-configured connection factory used to get connection coordinates from
*/
private void sendMessageOnSeparateConnection(String routingKey, Object payload, CachingConnectionFactory bootFactory) {
CachingConnectionFactory copiedConnectionFactory = null;
try {
copiedConnectionFactory = new CachingConnectionFactory(bootFactory.getHost(), bootFactory.getPort());
copiedConnectionFactory.setUsername(bootFactory.getUsername());
copiedConnectionFactory.setPassword(bootFactory.getRabbitConnectionFactory().getPassword());
if (bootFactory.getVirtualHost() != null) {
copiedConnectionFactory.setVirtualHost(bootFactory.getVirtualHost());
}
RabbitTemplate rabbitTemplate = new RabbitTemplate(copiedConnectionFactory);
rabbitTemplate.convertAndSend(routingKey, payload);
}
finally {
if (copiedConnectionFactory != null) {
copiedConnectionFactory.resetConnection();
}
}
}
@Test
public void testPropertiesPopulated() {
@@ -170,9 +115,7 @@ public class RabbitSourceListenerTests {
"--spring.rabbitmq.listener.simple.maxConcurrency = 3 ",
"--spring.rabbitmq.listener.simple.acknowledgeMode = NONE",
"--spring.rabbitmq.listener.simple.prefetch = 10",
"--spring.rabbitmq.listener.simple.batchSize = 5",
"--spring.rabbitmq.port=" +
"${spring.rabbitmq.test.port}"
"--spring.rabbitmq.listener.simple.batchSize = 5"
)) {
final RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
@@ -213,4 +156,5 @@ public class RabbitSourceListenerTests {
@Import(RabbitSupplierConfiguration.class)
public static class RabbitSourceTestApplication {
}
}