Refactor to support binder agnostic tests

This commit is contained in:
David Turanski
2020-11-01 16:53:32 -05:00
parent 26fe1c50bb
commit 5b5c641591
19 changed files with 884 additions and 648 deletions

View File

@@ -56,7 +56,7 @@ The host machine can connect to Kafka using the random mapped port:
"localhost:" + kafka.getMappedPort(9092);
```
These containers are intended to work with `StreamApplicationIntegrationTestSupport`, described below.
These containers are intended to work with `OutputMatchers`, described below.
### Stream Application Integration Test Support
@@ -110,7 +110,7 @@ public class KafkaTimeSourceTests extends KafkaStreamApplicationIntegrationTestS
}
```
We inherit `KafkaStreamApplicationIntegrationTestSupport` which starts a `kafka` TestContainer in a static initializer.
We inherit `KafkaStreamAppContainerTestConfiguration` which starts a `kafka` TestContainer in a static initializer.
The Time Source emits the time every second. In this case, it's hard to know what the expected output payload is, but it should at least match the date pattern.
This test also uses a `LogMatcher`, which is not strictly necessary, but used here to verify that the app logged the standard start up message - always a good sign.
Then we wait for a message on the output topic that matches the pattern.

View File

@@ -20,6 +20,10 @@
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test-support</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>

View File

@@ -16,10 +16,6 @@
package org.springframework.cloud.stream.app.test.integration;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -27,13 +23,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.SocketUtils;
import static org.springframework.cloud.stream.app.test.integration.TestTopicListener.STREAM_APPLICATIONS_TEST_TOPIC;
@@ -41,53 +31,38 @@ import static org.springframework.cloud.stream.app.test.integration.TestTopicLis
* Support utility for stream application integration testing .
* @author David Turanski
*/
@Testcontainers
@Component
public abstract class StreamApplicationIntegrationTestSupport {
public class OutputMatcher {
protected static final String DOCKER_ORG = "springcloudstream";
private final TestTopicListener testListener;
@Autowired
private AbstractTestTopicListener testListener;
protected static String prePackagedStreamAppImageName(String appName, String binderName, String version) {
return DOCKER_ORG + "/" + appName + "-" + binderName + ":" + version;
public OutputMatcher(TestTopicListener testListener) {
this.testListener = testListener;
}
public static String localHostAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
protected static File resourceAsFile(String path) {
try {
return new ClassPathResource(path).getFile();
}
catch (IOException e) {
throw new IllegalStateException("Unable to access resource " + path);
}
}
protected static final int findAvailablePort() {
return SocketUtils.findAvailableTcpPort(10000, 20000);
}
protected Callable<Boolean> messagesMatch() {
public Callable<Boolean> messagesMatch() {
return () -> testListener.allMatch().get();
}
protected <P> Callable<Boolean> payloadMatches(Predicate<P>... payloadMatchers) {
public <P> Callable<Boolean> payloadMatches(Predicate<P>... payloadMatchers) {
return payloadMatches(STREAM_APPLICATIONS_TEST_TOPIC, payloadMatchers);
}
protected Callable<Boolean> messageMatches(Predicate<Message<?>>... messageMatchers) {
public Callable<Boolean> messageMatches(Predicate<Message<?>>... messageMatchers) {
return messageMatches(STREAM_APPLICATIONS_TEST_TOPIC, messageMatchers);
}
public void addMessageMatcher(MessageMatcher messageMatcher) {
this.testListener.addMessageMatcher(STREAM_APPLICATIONS_TEST_TOPIC, messageMatcher);
}
public void clearMessageMatchers() {
testListener.clearMessageMatchers();
}
public void resetMessageMatchers() {
testListener.resetMessageMatchers();
}
// TODO: Implement support for multiple topics.
private <P> Callable<Boolean> payloadMatches(String topic, Predicate<P>... payloadMatchers) {
for (Predicate<P> payloadMatcher : payloadMatchers) {

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.SocketUtils;
/**
* Support utility for stream application integration testing .
* @author David Turanski
*/
public abstract class StreamAppContainerTestUtils {
/**
* Default docker org.
*/
public static String DOCKER_ORG = "springcloudstream";
public static final String prePackagedStreamAppImageName(String appName, String binderName, String version) {
return DOCKER_ORG + "/" + appName + "-" + binderName + ":" + version;
}
public static final String localHostAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
public static final File resourceAsFile(String path) {
try {
return new ClassPathResource(path).getFile();
}
catch (IOException e) {
throw new IllegalStateException("Unable to access resource " + path);
}
}
public static final int findAvailablePort() {
return SocketUtils.findAvailableTcpPort(10000, 20000);
}
}

View File

@@ -0,0 +1,25 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import org.springframework.messaging.Message;
public interface TestTopicSender {
<P> void send(String topic, P payload);
void send(String topic, Message<?> message);
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
public abstract class KafkaConfig {
final static String BINDER = "kafka";
final static Network network = Network.SHARED;
public static StreamAppContainer prepackagedContainerFor(String appName, String version) {
return new KafkaStreamAppContainer(
StreamAppContainerTestUtils.prePackagedStreamAppImageName(appName, BINDER, version),
kafka);
}
/**
* The KafkaContainer.
*/
public final static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.5.1"))
.withExposedPorts(9092, 9093)
.withNetwork(network);
static {
kafka.start();
}
}

View File

@@ -0,0 +1,209 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener;
import org.springframework.cloud.stream.app.test.integration.MessageMatcher;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener.STREAM_APPLICATIONS_TEST_TOPIC;
import static org.springframework.cloud.stream.app.test.integration.kafka.KafkaConfig.kafka;
@Configuration
@EnableKafka
public class KafkaStreamAppContainerTestConfiguration {
private static final String SUFFIX = UUID.randomUUID().toString().substring(0, 8);
private static final String STREAM_APPLICATION_TESTS_GROUP = "stream-application-tests_" + SUFFIX;
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory) {
return new KafkaTemplate(producerFactory);
}
@Bean
public OutputMatcher outputMatcher(TestTopicListener testTopicListener) {
return new OutputMatcher(testTopicListener);
}
@Bean
public TestTopicSender testTopicSender(KafkaTemplate kafkaTemplate) {
return new KafkaTemplateTopicSender(kafkaTemplate);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, STREAM_APPLICATION_TESTS_GROUP);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
cf.setBootstrapServersSupplier(() -> kafka.getBootstrapServers());
return cf;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
pf.setBootstrapServersSupplier(() -> kafka.getBootstrapServers());
return pf;
}
@Bean
public AdminClient admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
return KafkaAdminClient.create(configs);
}
@Bean
public KafkaTestListener testListener(AdminClient admin, KafkaListenerEndpointRegistry endpointRegistry) {
return new KafkaTestListener(admin, endpointRegistry);
}
@KafkaListener(autoStartup = "true", topicPattern = STREAM_APPLICATIONS_TEST_TOPIC)
static class KafkaTestListener extends AbstractTestTopicListener {
private final AdminClient admin;
private final KafkaListenerEndpointRegistry endpointRegistry;
private final Object lock = new Object();
KafkaTestListener(AdminClient admin, KafkaListenerEndpointRegistry endpointRegistry) {
super();
this.admin = admin;
this.endpointRegistry = endpointRegistry;
this.admin.createTopics(
Collections.singletonList(
new NewTopic(STREAM_APPLICATIONS_TEST_TOPIC, Optional.empty(), Optional.empty())));
await().atMost(Duration.ofSeconds(30))
.until(() -> {
Set<String> topics = admin.listTopics().names().get();
return topics.contains(STREAM_APPLICATIONS_TEST_TOPIC);
});
}
@Override
public boolean addMessageMatcher(String topic, MessageMatcher messageMatcher) {
boolean added = super.addMessageMatcher(topic, messageMatcher);
if (added) {
synchronized (lock) {
stop();
// rewind to consume messages that may have arrived before a verifier is registered.
admin.alterConsumerGroupOffsets(STREAM_APPLICATION_TESTS_GROUP,
Collections.singletonMap(new TopicPartition(topic, 0), new OffsetAndMetadata(0)));
start();
}
}
return added;
}
private void stop() {
this.endpointRegistry.getAllListenerContainers().forEach(container -> container.stop());
}
private void start() {
this.endpointRegistry.getAllListenerContainers().forEach(container -> container.start());
}
@Override
protected Function<Message<?>, String> topicForMessage() {
return message -> (String) message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
}
@KafkaHandler(isDefault = true)
public void listen(Message<?> message) {
super.listen(message);
}
}
static class KafkaTemplateTopicSender implements TestTopicSender {
private final KafkaTemplate kafkaTemplate;
KafkaTemplateTopicSender(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public <P> void send(String topic, P payload) {
doSend(topic, payload);
}
@Override
public void send(String topic, Message<?> message) {
doSend(topic, message);
}
private void doSend(String topic, Object payload) {
kafkaTemplate.send(topic, payload);
}
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Testcontainers
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = KafkaStreamAppContainerTestConfiguration.class)
public @interface KafkaStreamAppTest {
}

View File

@@ -1,205 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener;
import org.springframework.cloud.stream.app.test.integration.MessageMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamApplicationIntegrationTestSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener.STREAM_APPLICATIONS_TEST_TOPIC;
/**
* Base class for stream application integration testing with Test Containers and Kafka
* binder.
*/
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = KafkaStreamApplicationIntegrationTestSupport.KafkaTestConfiguration.class)
public abstract class KafkaStreamApplicationIntegrationTestSupport extends StreamApplicationIntegrationTestSupport {
final static String BINDER = "kafka";
final static Network network = Network.SHARED;
protected final static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.5.1"))
.withExposedPorts(9092, 9093)
.withNetwork(network);
static {
kafka.start();
}
protected static StreamAppContainer prepackagedKafkaContainerFor(String appName, String version) {
return new KafkaStreamAppContainer(prePackagedStreamAppImageName(appName, BINDER, version),
kafka);
}
@Configuration
@EnableKafka
static class KafkaTestConfiguration {
private static final String SUFFIX = UUID.randomUUID().toString().substring(0, 8);
private static final String STREAM_APPLICATION_TESTS_GROUP = "stream-application-tests_" + SUFFIX;
@Bean
KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory) {
return new KafkaTemplate(producerFactory);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, STREAM_APPLICATION_TESTS_GROUP);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
cf.setBootstrapServersSupplier(() -> kafka.getBootstrapServers());
return cf;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
pf.setBootstrapServersSupplier(() -> kafka.getBootstrapServers());
return pf;
}
@Bean
public AdminClient admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
return KafkaAdminClient.create(configs);
}
@Bean
KafkaTestListener testListener(AdminClient admin, KafkaListenerEndpointRegistry endpointRegistry) {
return new KafkaTestListener(admin, endpointRegistry);
}
@KafkaListener(autoStartup = "true", topicPattern = STREAM_APPLICATIONS_TEST_TOPIC)
static class KafkaTestListener extends AbstractTestTopicListener {
private final AdminClient admin;
private final KafkaListenerEndpointRegistry endpointRegistry;
private final Object lock = new Object();
KafkaTestListener(AdminClient admin, KafkaListenerEndpointRegistry endpointRegistry) {
super();
this.admin = admin;
this.endpointRegistry = endpointRegistry;
this.admin.createTopics(
Collections.singletonList(
new NewTopic(STREAM_APPLICATIONS_TEST_TOPIC, Optional.empty(), Optional.empty())));
await().atMost(Duration.ofSeconds(30))
.until(() -> {
Set<String> topics = admin.listTopics().names().get();
return topics.contains(STREAM_APPLICATIONS_TEST_TOPIC);
});
}
@Override
public boolean addMessageMatcher(String topic, MessageMatcher messageMatcher) {
boolean added = super.addMessageMatcher(topic, messageMatcher);
if (added) {
synchronized (lock) {
stop();
// rewind to consume messages that may have arrived before a verifier is registered.
admin.alterConsumerGroupOffsets(STREAM_APPLICATION_TESTS_GROUP,
Collections.singletonMap(new TopicPartition(topic, 0), new OffsetAndMetadata(0)));
start();
}
}
return added;
}
private void stop() {
this.endpointRegistry.getAllListenerContainers().forEach(container -> container.stop());
}
private void start() {
this.endpointRegistry.getAllListenerContainers().forEach(container -> container.start());
}
@Override
protected Function<Message<?>, String> topicForMessage() {
return message -> (String) message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
}
@KafkaHandler(isDefault = true)
public void listen(Message<?> message) {
super.listen(message);
}
}
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.rabbitmq;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.utility.DockerImageName;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
public abstract class RabbitMQConfig {
/**
* The RabbitMQContainer.
*/
public static RabbitMQContainer rabbitmq;
final static String BINDER = "rabbit";
final static Network network = Network.SHARED;
static {
rabbitmq = new RabbitMQContainer(DockerImageName.parse("rabbitmq:3"))
.withNetwork(network)
.withExposedPorts(5672, 15672);
rabbitmq.start();
}
public static StreamAppContainer prepackagedContainerFor(String appName, String version) {
return new RabbitMQStreamAppContainer(
StreamAppContainerTestUtils.prePackagedStreamAppImageName(appName, BINDER, version),
rabbitmq);
}
}

View File

@@ -0,0 +1,276 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.rabbitmq;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener;
import org.springframework.cloud.stream.app.test.integration.MessageMatcher;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.CollectionUtils;
@Configuration
@EnableRabbit
public abstract class RabbitMQStreamAppContainerTestConfiguration {
private static final String STREAM_APPLICATION_TESTS_GROUP = "stream-application-tests";
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(2000);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
rabbitTemplate.setRetryTemplate(retryTemplate);
// Need to be synchronous in case the app input exchange isn't ready to receive messages
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public OutputMatcher outputMatcher(TestTopicListener testTopicListener) {
return new OutputMatcher(testTopicListener);
}
@Bean
public TestTopicSender testTopicSender(RabbitTemplate rabbitTemplate) {
return new RabbitTemplateTopicSender(rabbitTemplate);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new MessageConverter() {
@Override
public org.springframework.amqp.core.Message toMessage(Object o, MessageProperties messageProperties)
throws MessageConversionException {
throw new UnsupportedOperationException("toMessage not implemented.");
}
@Override
public Object fromMessage(org.springframework.amqp.core.Message message)
throws MessageConversionException {
return new String(message.getBody());
}
});
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(StreamAppContainerTestUtils.localHostAddress(),
RabbitMQConfig.rabbitmq.getMappedPort(5672));
}
@Bean
public RabbitMQTestListener rabbitMQTestListener(RabbitAdmin admin) {
return new RabbitMQTestListener(admin);
}
static class RabbitMQTestListener extends AbstractTestTopicListener {
public static final int CACHE_TTL_SEC = 120;
private final Cache<String, Set<Message<?>>> cache = Caffeine.newBuilder()
.expireAfterWrite(CACHE_TTL_SEC, TimeUnit.SECONDS)
.build();
static final String STREAM_APPLICATIONS_TEST_QUEUE = "stream-applications-test-queue";
private final RabbitAdmin admin;
private final Queue queue;
private final TopicExchange exchange = new TopicExchange(STREAM_APPLICATIONS_TEST_TOPIC);
RabbitMQTestListener(RabbitAdmin admin) {
super();
this.admin = admin;
this.queue = new Queue(STREAM_APPLICATIONS_TEST_QUEUE);
admin.declareQueue(queue);
admin.declareExchange(exchange);
admin.declareBinding(
BindingBuilder.bind(queue).to(exchange).with("#"));
}
@Override
public AtomicBoolean allMatch(String topic) {
AtomicBoolean all = super.allMatch(topic);
if (!all.get()) {
if (messageMatchers.get(topic) == null || cache.getIfPresent(topic) == null) {
return all;
}
List<MessageMatcher> matchers = messageMatchers.get(topic);
all.set(true);
cache.getIfPresent(topic).forEach(message -> matchers.stream().filter(mm -> !mm.isSatisfied())
.forEach(mm -> all.compareAndSet(true, mm.test(message))));
}
return all;
}
@Override
public AtomicBoolean matches(String topic, Predicate<?>... predicates) {
AtomicBoolean matches = super.matches(topic, predicates);
if (matches.get() || CollectionUtils.isEmpty(cache.getIfPresent(topic))) {
return matches;
}
for (Predicate<?> predicate : predicates) {
MessageMatcher matcher = messageMatcher(topic, predicate)
.orElse(MessageMatcher.payloadMatcher(o -> false));
if (messageMatcher(topic, predicate).isPresent()) {
Set<Message<?>> messages = cache.getIfPresent(topic);
matches.set(true);
messages.forEach(message -> {
if (matches.compareAndSet(true, matcher.test(message))) {
logger.debug("Matched cached message {} for topic {}", message, topic);
return;
}
});
}
}
return matches;
}
private void updateCache(String topic, Set<Message<?>> messages) {
if (CollectionUtils.isEmpty(messages)) {
cache.invalidate(topic);
}
else {
cache.put(topic, messages);
}
}
private void cacheMessage(String topic, Message<?> message) {
if (cache.getIfPresent(topic) == null) {
cache.put(topic, new HashSet<>());
}
Set<Message<?>> messages = cache.getIfPresent(topic);
if (messages.add(message)) {
logger.debug("Caching message: {} for topic {}", message, topic);
}
}
@Override
protected Function<Message<?>, String> topicForMessage() {
return message -> (String) message.getHeaders().get(AmqpHeaders.RECEIVED_EXCHANGE);
}
@RabbitListener(autoStartup = "true", group = STREAM_APPLICATION_TESTS_GROUP, queues = {
STREAM_APPLICATIONS_TEST_QUEUE
})
@Override
public void listen(Message<?> message) {
String topic = topicForMessage().apply(message);
logger.debug("Received message: {} on topic {}", message, topic);
if (!messageMatchers.containsKey(topic)) {
cacheMessage(topic, message);
return;
}
logger.debug("Verifying message: {} on topic {}", message, topic);
AtomicBoolean any = new AtomicBoolean(false);
messageMatchers.get(topic).forEach(v -> {
any.compareAndSet(false, v.test(message));
v.setSatisfied(any.get());
});
if (!any.get()) {
cacheMessage(topic, message);
}
else {
logger.debug("Verified message: {} on topic {}", message, topic);
}
if (!allMatch(topic).get()) {
cacheMessage(topic, message);
}
}
}
static class RabbitTemplateTopicSender implements TestTopicSender {
private final RabbitTemplate rabbitTemplate;
private String routingKey = "#";
RabbitTemplateTopicSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public <P> void send(String topic, P payload) {
rabbitTemplate.convertAndSend(topic, routingKey, payload);
}
@Override
public void send(String topic, Message<?> message) {
}
public void setRoutingKey(@NonNull String routingKey) {
this.routingKey = routingKey;
}
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.rabbitmq;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Testcontainers
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = RabbitMQStreamAppContainerTestConfiguration.class)
public @interface RabbitMQStreamAppTest {
}

View File

@@ -1,251 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.rabbitmq;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.utility.DockerImageName;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener;
import org.springframework.cloud.stream.app.test.integration.MessageMatcher;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamApplicationIntegrationTestSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.util.CollectionUtils;
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = RabbitMQStreamApplicationIntegrationTestSupport.RabbitMQTestConfiguration.class)
public abstract class RabbitMQStreamApplicationIntegrationTestSupport extends StreamApplicationIntegrationTestSupport {
protected static RabbitMQContainer rabbitmq;
final static String BINDER = "rabbit";
final static Network network = Network.SHARED;
static {
rabbitmq = new RabbitMQContainer(DockerImageName.parse("rabbitmq:3-management"))
.withNetwork(network)
.withExposedPorts(5672, 15672);
rabbitmq.start();
}
protected static StreamAppContainer prepackagedRabbitMQContainerFor(String appName, String version) {
return new RabbitMQStreamAppContainer(prePackagedStreamAppImageName(appName, BINDER, version),
rabbitmq);
}
@Configuration
@EnableRabbit
static class RabbitMQTestConfiguration {
public static final String STREAM_APPLICATION_TESTS_GROUP = "stream-application-tests";
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new MessageConverter() {
@Override
public org.springframework.amqp.core.Message toMessage(Object o, MessageProperties messageProperties)
throws MessageConversionException {
throw new UnsupportedOperationException("toMessage not implemented.");
}
@Override
public Object fromMessage(org.springframework.amqp.core.Message message)
throws MessageConversionException {
return new String(message.getBody());
}
});
return factory;
}
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(localHostAddress(), rabbitmq.getMappedPort(5672));
}
@Bean
RabbitMQTestListener rabbitMQTestListener(RabbitAdmin admin) {
return new RabbitMQTestListener(admin);
}
static class RabbitMQTestListener extends AbstractTestTopicListener {
public static final int CACHE_TTL_SEC = 120;
private final Cache<String, Set<Message<?>>> cache = Caffeine.newBuilder()
.expireAfterWrite(CACHE_TTL_SEC, TimeUnit.SECONDS)
.build();
static final String STREAM_APPLICATIONS_TEST_QUEUE = "stream-applications-test-queue";
private final RabbitAdmin admin;
private final Queue queue;
private final TopicExchange exchange = new TopicExchange(STREAM_APPLICATIONS_TEST_TOPIC);
RabbitMQTestListener(RabbitAdmin admin) {
super();
this.admin = admin;
this.queue = new Queue(STREAM_APPLICATIONS_TEST_QUEUE);
admin.declareQueue(queue);
admin.declareExchange(exchange);
admin.declareBinding(
BindingBuilder.bind(queue).to(exchange).with("#"));
}
@Override
public AtomicBoolean allMatch(String topic) {
AtomicBoolean all = super.allMatch(topic);
if (!all.get()) {
if (messageMatchers.get(topic) == null || cache.getIfPresent(topic) == null) {
return all;
}
List<MessageMatcher> matchers = messageMatchers.get(topic);
all.set(true);
cache.getIfPresent(topic).forEach(message -> matchers.stream().filter(mm -> !mm.isSatisfied())
.forEach(mm -> all.compareAndSet(true, mm.test(message))));
}
return all;
}
@Override
public AtomicBoolean matches(String topic, Predicate<?>... predicates) {
AtomicBoolean matches = super.matches(topic, predicates);
if (matches.get() || CollectionUtils.isEmpty(cache.getIfPresent(topic))) {
return matches;
}
for (Predicate<?> predicate : predicates) {
MessageMatcher matcher = messageMatcher(topic, predicate)
.orElse(MessageMatcher.payloadMatcher(o -> false));
if (messageMatcher(topic, predicate).isPresent()) {
Set<Message<?>> messages = cache.getIfPresent(topic);
matches.set(true);
messages.forEach(message -> {
if (matches.compareAndSet(true, matcher.test(message))) {
logger.debug("Matched cached message {} for topic {}", message, topic);
messages.remove(message);
return;
}
});
updateCache(topic, messages);
}
}
return matches;
}
private void updateCache(String topic, Set<Message<?>> messages) {
if (CollectionUtils.isEmpty(messages)) {
cache.invalidate(topic);
}
else {
cache.put(topic, messages);
}
}
private void cacheMessage(String topic, Message<?> message) {
if (cache.getIfPresent(topic) == null) {
cache.put(topic, new HashSet<>());
}
Set<Message<?>> messages = cache.getIfPresent(topic);
if (messages.add(message)) {
logger.debug("Caching message: {} for topic {}", message, topic);
}
}
@Override
protected Function<Message<?>, String> topicForMessage() {
return message -> (String) message.getHeaders().get(AmqpHeaders.RECEIVED_EXCHANGE);
}
//@formatter:off
@RabbitListener(autoStartup = "true", group = STREAM_APPLICATION_TESTS_GROUP,
queues = {STREAM_APPLICATIONS_TEST_QUEUE})
//@formatter:on
@Override
public void listen(Message<?> message) {
String topic = topicForMessage().apply(message);
logger.debug("Received message: {} on topic {}", message, topic);
if (!messageMatchers.containsKey(topic)) {
cacheMessage(topic, message);
return;
}
logger.debug("Verifying message: {} on topic {}", message, topic);
AtomicBoolean any = new AtomicBoolean(false);
messageMatchers.get(topic).forEach(v -> {
any.compareAndSet(false, v.test(message));
v.setSatisfied(any.get());
});
if (!any.get()) {
cacheMessage(topic, message);
}
else {
logger.debug("Verified message: {} on topic {}", message, topic);
}
if (!allMatch(topic).get()) {
cacheMessage(topic, message);
}
}
}
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration;
import java.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.TestTopicListener.STREAM_APPLICATIONS_TEST_TOPIC;
public abstract class StreamAppContainerTests {
@Autowired
OutputMatcher outputMatcher;
@Autowired
TestTopicSender testTopicSender;
@AfterEach
void reset() {
outputMatcher.clearMessageMatchers();
}
@Test
void payloadVerifiers() {
outputMatcher.addMessageMatcher(MessageMatcher.payloadMatcher((s -> s.equals("hello test1"))));
outputMatcher.addMessageMatcher(MessageMatcher.payloadMatcher(s -> s.equals("hello test2")));
testTopicSender.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test1");
testTopicSender.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test2");
await().atMost(Duration.ofSeconds(10))
.until(outputMatcher.messagesMatch());
}
@Test
void verifierOnTheFly() {
testTopicSender.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test3");
testTopicSender.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test4");
await().atMost(Duration.ofSeconds(30))
.until(outputMatcher.payloadMatches(s -> s.equals("hello test3"), s -> s.equals("hello test4")));
}
@Test
void verifierOnTheFlyOutOfOrder() {
testTopicSender.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test5");
testTopicSender.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test6");
await().atMost(Duration.ofSeconds(30))
.until(outputMatcher.payloadMatches(s -> s.equals("hello test6"), s -> s.equals("hello test5")));
}
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTests;
@KafkaStreamAppTest
public class KafkaStreamAppContainerTests extends StreamAppContainerTests {
}

View File

@@ -1,71 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.kafka;
import java.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.MessageMatcher;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.kafka.core.KafkaTemplate;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener.STREAM_APPLICATIONS_TEST_TOPIC;
public class KafkaStreamApplicationIntegrationTestSupportTests extends KafkaStreamApplicationIntegrationTestSupport {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private TestTopicListener testTopicListener;
@AfterEach
void reset() {
testTopicListener.clearMessageMatchers();
}
@Test
void payloadVerifiers() {
testTopicListener.addMessageMatcher(MessageMatcher.payloadMatcher((s -> s.equals("hello test1"))));
testTopicListener.addMessageMatcher(MessageMatcher.payloadMatcher(s -> s.equals("hello test2")));
kafkaTemplate.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test1");
kafkaTemplate.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test2");
await().atMost(Duration.ofSeconds(10))
.until(messagesMatch());
}
@Test
void verifierOnTheFly() {
kafkaTemplate.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test3");
kafkaTemplate.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test4");
await().atMost(Duration.ofSeconds(30))
.until(payloadMatches(s -> s.equals("hello test3"), s -> s.equals("hello test4")));
}
@Test
void verifierOnTheFlyOutOfOrder() {
kafkaTemplate.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test5");
kafkaTemplate.send(STREAM_APPLICATIONS_TEST_TOPIC, "hello test6");
await().atMost(Duration.ofSeconds(30))
.until(payloadMatches(s -> s.equals("hello test6"), s -> s.equals("hello test5")));
}
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.rabbitmq;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTests;
@RabbitMQStreamAppTest
public class RabbitMQStreamAppContainerTests extends StreamAppContainerTests {
}

View File

@@ -1,75 +0,0 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.app.test.integration.rabbitmq;
import java.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.app.test.integration.MessageMatcher;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import static org.awaitility.Awaitility.await;
import static org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener.STREAM_APPLICATIONS_TEST_TOPIC;
public class RabbitMQStreamApplicationIntegrationTestSupportTests
extends RabbitMQStreamApplicationIntegrationTestSupport {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TestTopicListener testTopicListener;
@Autowired
RabbitAdmin rabbitAdmin;
@AfterEach
void reset() {
testTopicListener.clearMessageMatchers();
}
@Test
void multipleVerifiers() {
testTopicListener.addMessageMatcher(MessageMatcher.payloadMatcher(s -> s.equals("hello test1")));
testTopicListener.addMessageMatcher(MessageMatcher.payloadMatcher(s -> s.equals("hello test2")));
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test1");
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test2");
await().atMost(Duration.ofSeconds(30))
.until(messagesMatch());
}
@Test
void verifierOnTheFly() {
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test3");
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test4");
await().atMost(Duration.ofSeconds(30))
.until(payloadMatches(s -> s.equals("hello test3"), s -> s.equals("hello test4")));
}
@Test
void verifierOnTheFlyOutOfOrder() {
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test5");
rabbitTemplate.convertAndSend(STREAM_APPLICATIONS_TEST_TOPIC, "#", "hello test6");
await().atMost(Duration.ofSeconds(30))
.until(payloadMatches(s -> s.equals("hello test6"), s -> s.equals("hello test5")));
}
}

View File

@@ -11,6 +11,7 @@
<logger name="org.testcontainers" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>
<logger name="org.junit" level="DEBUG"/>
<!-- <logger name="org.springframework.amqp.rabbit.core" level="DEBUG"/>-->
<!-- <logger name="org.springframework.kafka" level="INFO"/>-->
<logger name="org.springframework.cloud.stream.app.test.integration" level="DEBUG"/>